gkdbus: Fix underflow and unreachable code bug
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * SPDX-License-Identifier: LGPL-2.1-or-later
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General
18  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include <string.h>
25 #include "goutputstream.h"
26 #include "gcancellable.h"
27 #include "gasyncresult.h"
28 #include "gtask.h"
29 #include "ginputstream.h"
30 #include "gioerror.h"
31 #include "gioprivate.h"
32 #include "glibintl.h"
33 #include "gpollableoutputstream.h"
34
35 /**
36  * SECTION:goutputstream
37  * @short_description: Base class for implementing streaming output
38  * @include: gio/gio.h
39  *
40  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
41  * to close a stream (g_output_stream_close()) and to flush pending writes
42  * (g_output_stream_flush()). 
43  *
44  * To copy the content of an input stream to an output stream without 
45  * manually handling the reads and writes, use g_output_stream_splice().
46  *
47  * See the documentation for #GIOStream for details of thread safety of
48  * streaming APIs.
49  *
50  * All of these functions have async variants too.
51  **/
52
53 struct _GOutputStreamPrivate {
54   guint closed : 1;
55   guint pending : 1;
56   guint closing : 1;
57   GAsyncReadyCallback outstanding_callback;
58 };
59
60 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
61
62 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
63                                                     GInputStream              *source,
64                                                     GOutputStreamSpliceFlags   flags,
65                                                     GCancellable              *cancellable,
66                                                     GError                   **error);
67 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
68                                                     const void                *buffer,
69                                                     gsize                      count,
70                                                     int                        io_priority,
71                                                     GCancellable              *cancellable,
72                                                     GAsyncReadyCallback        callback,
73                                                     gpointer                   data);
74 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
75                                                     GAsyncResult              *result,
76                                                     GError                   **error);
77 static gboolean g_output_stream_real_writev        (GOutputStream             *stream,
78                                                     const GOutputVector       *vectors,
79                                                     gsize                      n_vectors,
80                                                     gsize                     *bytes_written,
81                                                     GCancellable              *cancellable,
82                                                     GError                   **error);
83 static void     g_output_stream_real_writev_async  (GOutputStream             *stream,
84                                                     const GOutputVector       *vectors,
85                                                     gsize                      n_vectors,
86                                                     int                        io_priority,
87                                                     GCancellable              *cancellable,
88                                                     GAsyncReadyCallback        callback,
89                                                     gpointer                   data);
90 static gboolean g_output_stream_real_writev_finish (GOutputStream             *stream,
91                                                     GAsyncResult              *result,
92                                                     gsize                     *bytes_written,
93                                                     GError                   **error);
94 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
95                                                     GInputStream              *source,
96                                                     GOutputStreamSpliceFlags   flags,
97                                                     int                        io_priority,
98                                                     GCancellable              *cancellable,
99                                                     GAsyncReadyCallback        callback,
100                                                     gpointer                   data);
101 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
102                                                     GAsyncResult              *result,
103                                                     GError                   **error);
104 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
105                                                     int                        io_priority,
106                                                     GCancellable              *cancellable,
107                                                     GAsyncReadyCallback        callback,
108                                                     gpointer                   data);
109 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
110                                                     GAsyncResult              *result,
111                                                     GError                   **error);
112 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
113                                                     int                        io_priority,
114                                                     GCancellable              *cancellable,
115                                                     GAsyncReadyCallback        callback,
116                                                     gpointer                   data);
117 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
118                                                     GAsyncResult              *result,
119                                                     GError                   **error);
120 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
121                                                     GCancellable              *cancellable,
122                                                     GError                   **error);
123 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
124                                                       int                      io_priority,
125                                                       GCancellable            *cancellable,
126                                                       GAsyncReadyCallback      callback,
127                                                       gpointer                 data);
128 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
129                                                        GAsyncResult           *result,
130                                                        GError                **error);
131
132 static void
133 g_output_stream_dispose (GObject *object)
134 {
135   GOutputStream *stream;
136
137   stream = G_OUTPUT_STREAM (object);
138   
139   if (!stream->priv->closed)
140     g_output_stream_close (stream, NULL, NULL);
141
142   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
143 }
144
145 static void
146 g_output_stream_class_init (GOutputStreamClass *klass)
147 {
148   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
149
150   gobject_class->dispose = g_output_stream_dispose;
151
152   klass->splice = g_output_stream_real_splice;
153   
154   klass->write_async = g_output_stream_real_write_async;
155   klass->write_finish = g_output_stream_real_write_finish;
156   klass->writev_fn = g_output_stream_real_writev;
157   klass->writev_async = g_output_stream_real_writev_async;
158   klass->writev_finish = g_output_stream_real_writev_finish;
159   klass->splice_async = g_output_stream_real_splice_async;
160   klass->splice_finish = g_output_stream_real_splice_finish;
161   klass->flush_async = g_output_stream_real_flush_async;
162   klass->flush_finish = g_output_stream_real_flush_finish;
163   klass->close_async = g_output_stream_real_close_async;
164   klass->close_finish = g_output_stream_real_close_finish;
165 }
166
167 static void
168 g_output_stream_init (GOutputStream *stream)
169 {
170   stream->priv = g_output_stream_get_instance_private (stream);
171 }
172
173 /**
174  * g_output_stream_write:
175  * @stream: a #GOutputStream.
176  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
177  * @count: the number of bytes to write
178  * @cancellable: (nullable): optional cancellable object
179  * @error: location to store the error occurring, or %NULL to ignore
180  *
181  * Tries to write @count bytes from @buffer into the stream. Will block
182  * during the operation.
183  * 
184  * If count is 0, returns 0 and does nothing. A value of @count
185  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
186  *
187  * On success, the number of bytes written to the stream is returned.
188  * It is not an error if this is not the same as the requested size, as it
189  * can happen e.g. on a partial I/O error, or if there is not enough
190  * storage in the stream. All writes block until at least one byte
191  * is written or an error occurs; 0 is never returned (unless
192  * @count is 0).
193  * 
194  * If @cancellable is not %NULL, then the operation can be cancelled by
195  * triggering the cancellable object from another thread. If the operation
196  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
197  * operation was partially finished when the operation was cancelled the
198  * partial result will be returned, without an error.
199  *
200  * On error -1 is returned and @error is set accordingly.
201  * 
202  * Virtual: write_fn
203  *
204  * Returns: Number of bytes written, or -1 on error
205  **/
206 gssize
207 g_output_stream_write (GOutputStream  *stream,
208                        const void     *buffer,
209                        gsize           count,
210                        GCancellable   *cancellable,
211                        GError        **error)
212 {
213   GOutputStreamClass *class;
214   gssize res;
215
216   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
217   g_return_val_if_fail (buffer != NULL, 0);
218
219   if (count == 0)
220     return 0;
221   
222   if (((gssize) count) < 0)
223     {
224       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
225                    _("Too large count value passed to %s"), G_STRFUNC);
226       return -1;
227     }
228
229   class = G_OUTPUT_STREAM_GET_CLASS (stream);
230
231   if (class->write_fn == NULL) 
232     {
233       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
234                            _("Output stream doesn’t implement write"));
235       return -1;
236     }
237   
238   if (!g_output_stream_set_pending (stream, error))
239     return -1;
240   
241   if (cancellable)
242     g_cancellable_push_current (cancellable);
243   
244   res = class->write_fn (stream, buffer, count, cancellable, error);
245   
246   if (cancellable)
247     g_cancellable_pop_current (cancellable);
248   
249   g_output_stream_clear_pending (stream);
250
251   return res; 
252 }
253
254 /**
255  * g_output_stream_write_all:
256  * @stream: a #GOutputStream.
257  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
258  * @count: the number of bytes to write
259  * @bytes_written: (out) (optional): location to store the number of bytes that was
260  *     written to the stream
261  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
262  * @error: location to store the error occurring, or %NULL to ignore
263  *
264  * Tries to write @count bytes from @buffer into the stream. Will block
265  * during the operation.
266  * 
267  * This function is similar to g_output_stream_write(), except it tries to
268  * write as many bytes as requested, only stopping on an error.
269  *
270  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
271  * is set to @count.
272  * 
273  * If there is an error during the operation %FALSE is returned and @error
274  * is set to indicate the error status.
275  *
276  * As a special exception to the normal conventions for functions that
277  * use #GError, if this function returns %FALSE (and sets @error) then
278  * @bytes_written will be set to the number of bytes that were
279  * successfully written before the error was encountered.  This
280  * functionality is only available from C.  If you need it from another
281  * language then you must write your own loop around
282  * g_output_stream_write().
283  *
284  * Returns: %TRUE on success, %FALSE if there was an error
285  **/
286 gboolean
287 g_output_stream_write_all (GOutputStream  *stream,
288                            const void     *buffer,
289                            gsize           count,
290                            gsize          *bytes_written,
291                            GCancellable   *cancellable,
292                            GError        **error)
293 {
294   gsize _bytes_written;
295   gssize res;
296
297   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
298   g_return_val_if_fail (buffer != NULL || count == 0, FALSE);
299
300   _bytes_written = 0;
301   while (_bytes_written < count)
302     {
303       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
304                                    cancellable, error);
305       if (res == -1)
306         {
307           if (bytes_written)
308             *bytes_written = _bytes_written;
309           return FALSE;
310         }
311       g_return_val_if_fail (res > 0, FALSE);
312
313       _bytes_written += res;
314     }
315   
316   if (bytes_written)
317     *bytes_written = _bytes_written;
318
319   return TRUE;
320 }
321
322 /**
323  * g_output_stream_writev:
324  * @stream: a #GOutputStream.
325  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
326  * @n_vectors: the number of vectors to write
327  * @bytes_written: (out) (optional): location to store the number of bytes that were
328  *     written to the stream
329  * @cancellable: (nullable): optional cancellable object
330  * @error: location to store the error occurring, or %NULL to ignore
331  *
332  * Tries to write the bytes contained in the @n_vectors @vectors into the
333  * stream. Will block during the operation.
334  *
335  * If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
336  * does nothing.
337  *
338  * On success, the number of bytes written to the stream is returned.
339  * It is not an error if this is not the same as the requested size, as it
340  * can happen e.g. on a partial I/O error, or if there is not enough
341  * storage in the stream. All writes block until at least one byte
342  * is written or an error occurs; 0 is never returned (unless
343  * @n_vectors is 0 or the sum of all bytes in @vectors is 0).
344  *
345  * If @cancellable is not %NULL, then the operation can be cancelled by
346  * triggering the cancellable object from another thread. If the operation
347  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
348  * operation was partially finished when the operation was cancelled the
349  * partial result will be returned, without an error.
350  *
351  * Some implementations of g_output_stream_writev() may have limitations on the
352  * aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these
353  * are exceeded. For example, when writing to a local file on UNIX platforms,
354  * the aggregate buffer size must not exceed %G_MAXSSIZE bytes.
355  *
356  * Virtual: writev_fn
357  *
358  * Returns: %TRUE on success, %FALSE if there was an error
359  *
360  * Since: 2.60
361  */
362 gboolean
363 g_output_stream_writev (GOutputStream        *stream,
364                         const GOutputVector  *vectors,
365                         gsize                 n_vectors,
366                         gsize                *bytes_written,
367                         GCancellable         *cancellable,
368                         GError              **error)
369 {
370   GOutputStreamClass *class;
371   gboolean res;
372   gsize _bytes_written = 0;
373
374   if (bytes_written)
375     *bytes_written = 0;
376
377   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
378   g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
379   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
380   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
381
382   if (n_vectors == 0)
383     return TRUE;
384
385   class = G_OUTPUT_STREAM_GET_CLASS (stream);
386
387   g_return_val_if_fail (class->writev_fn != NULL, FALSE);
388
389   if (!g_output_stream_set_pending (stream, error))
390     return FALSE;
391
392   if (cancellable)
393     g_cancellable_push_current (cancellable);
394
395   res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error);
396
397   g_warn_if_fail (res || _bytes_written == 0);
398   g_warn_if_fail (res || (error == NULL || *error != NULL));
399
400   if (cancellable)
401     g_cancellable_pop_current (cancellable);
402
403   g_output_stream_clear_pending (stream);
404
405   if (bytes_written)
406     *bytes_written = _bytes_written;
407
408   return res;
409 }
410
411 /**
412  * g_output_stream_writev_all:
413  * @stream: a #GOutputStream.
414  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
415  * @n_vectors: the number of vectors to write
416  * @bytes_written: (out) (optional): location to store the number of bytes that were
417  *     written to the stream
418  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
419  * @error: location to store the error occurring, or %NULL to ignore
420  *
421  * Tries to write the bytes contained in the @n_vectors @vectors into the
422  * stream. Will block during the operation.
423  *
424  * This function is similar to g_output_stream_writev(), except it tries to
425  * write as many bytes as requested, only stopping on an error.
426  *
427  * On a successful write of all @n_vectors vectors, %TRUE is returned, and
428  * @bytes_written is set to the sum of all the sizes of @vectors.
429  *
430  * If there is an error during the operation %FALSE is returned and @error
431  * is set to indicate the error status.
432  *
433  * As a special exception to the normal conventions for functions that
434  * use #GError, if this function returns %FALSE (and sets @error) then
435  * @bytes_written will be set to the number of bytes that were
436  * successfully written before the error was encountered.  This
437  * functionality is only available from C. If you need it from another
438  * language then you must write your own loop around
439  * g_output_stream_write().
440  *
441  * The content of the individual elements of @vectors might be changed by this
442  * function.
443  *
444  * Returns: %TRUE on success, %FALSE if there was an error
445  *
446  * Since: 2.60
447  */
448 gboolean
449 g_output_stream_writev_all (GOutputStream  *stream,
450                             GOutputVector  *vectors,
451                             gsize           n_vectors,
452                             gsize          *bytes_written,
453                             GCancellable   *cancellable,
454                             GError        **error)
455 {
456   gsize _bytes_written = 0;
457   gsize i, to_be_written = 0;
458
459   if (bytes_written)
460     *bytes_written = 0;
461
462   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
463   g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
464   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
465   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
466
467   /* We can't write more than G_MAXSIZE bytes overall, otherwise we
468    * would overflow the bytes_written counter */
469   for (i = 0; i < n_vectors; i++)
470     {
471        if (to_be_written > G_MAXSIZE - vectors[i].size)
472          {
473            g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
474                         _("Sum of vectors passed to %s too large"), G_STRFUNC);
475            return FALSE;
476          }
477        to_be_written += vectors[i].size;
478     }
479
480   _bytes_written = 0;
481   while (n_vectors > 0 && to_be_written > 0)
482     {
483       gsize n_written = 0;
484       gboolean res;
485
486       res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error);
487
488       if (!res)
489         {
490           if (bytes_written)
491             *bytes_written = _bytes_written;
492           return FALSE;
493         }
494
495       g_return_val_if_fail (n_written > 0, FALSE);
496       _bytes_written += n_written;
497
498       /* skip vectors that have been written in full */
499       while (n_vectors > 0 && n_written >= vectors[0].size)
500         {
501           n_written -= vectors[0].size;
502           ++vectors;
503           --n_vectors;
504         }
505       /* skip partially written vector data */
506       if (n_written > 0 && n_vectors > 0)
507         {
508           vectors[0].size -= n_written;
509           vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written;
510         }
511     }
512
513   if (bytes_written)
514     *bytes_written = _bytes_written;
515
516   return TRUE;
517 }
518
519 /**
520  * g_output_stream_printf:
521  * @stream: a #GOutputStream.
522  * @bytes_written: (out) (optional): location to store the number of bytes that was
523  *     written to the stream
524  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
525  * @error: location to store the error occurring, or %NULL to ignore
526  * @format: the format string. See the printf() documentation
527  * @...: the parameters to insert into the format string
528  *
529  * This is a utility function around g_output_stream_write_all(). It
530  * uses g_strdup_vprintf() to turn @format and @... into a string that
531  * is then written to @stream.
532  *
533  * See the documentation of g_output_stream_write_all() about the
534  * behavior of the actual write operation.
535  *
536  * Note that partial writes cannot be properly checked with this
537  * function due to the variable length of the written string, if you
538  * need precise control over partial write failures, you need to
539  * create you own printf()-like wrapper around g_output_stream_write()
540  * or g_output_stream_write_all().
541  *
542  * Since: 2.40
543  *
544  * Returns: %TRUE on success, %FALSE if there was an error
545  **/
546 gboolean
547 g_output_stream_printf (GOutputStream  *stream,
548                         gsize          *bytes_written,
549                         GCancellable   *cancellable,
550                         GError        **error,
551                         const gchar    *format,
552                         ...)
553 {
554   va_list  args;
555   gboolean success;
556
557   va_start (args, format);
558   success = g_output_stream_vprintf (stream, bytes_written, cancellable,
559                                      error, format, args);
560   va_end (args);
561
562   return success;
563 }
564
565 /**
566  * g_output_stream_vprintf:
567  * @stream: a #GOutputStream.
568  * @bytes_written: (out) (optional): location to store the number of bytes that was
569  *     written to the stream
570  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
571  * @error: location to store the error occurring, or %NULL to ignore
572  * @format: the format string. See the printf() documentation
573  * @args: the parameters to insert into the format string
574  *
575  * This is a utility function around g_output_stream_write_all(). It
576  * uses g_strdup_vprintf() to turn @format and @args into a string that
577  * is then written to @stream.
578  *
579  * See the documentation of g_output_stream_write_all() about the
580  * behavior of the actual write operation.
581  *
582  * Note that partial writes cannot be properly checked with this
583  * function due to the variable length of the written string, if you
584  * need precise control over partial write failures, you need to
585  * create you own printf()-like wrapper around g_output_stream_write()
586  * or g_output_stream_write_all().
587  *
588  * Since: 2.40
589  *
590  * Returns: %TRUE on success, %FALSE if there was an error
591  **/
592 gboolean
593 g_output_stream_vprintf (GOutputStream  *stream,
594                          gsize          *bytes_written,
595                          GCancellable   *cancellable,
596                          GError        **error,
597                          const gchar    *format,
598                          va_list         args)
599 {
600   gchar    *text;
601   gboolean  success;
602
603   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
604   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
605   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
606   g_return_val_if_fail (format != NULL, FALSE);
607
608   text = g_strdup_vprintf (format, args);
609   success = g_output_stream_write_all (stream,
610                                        text, strlen (text),
611                                        bytes_written, cancellable, error);
612   g_free (text);
613
614   return success;
615 }
616
617 /**
618  * g_output_stream_write_bytes:
619  * @stream: a #GOutputStream.
620  * @bytes: the #GBytes to write
621  * @cancellable: (nullable): optional cancellable object
622  * @error: location to store the error occurring, or %NULL to ignore
623  *
624  * A wrapper function for g_output_stream_write() which takes a
625  * #GBytes as input.  This can be more convenient for use by language
626  * bindings or in other cases where the refcounted nature of #GBytes
627  * is helpful over a bare pointer interface.
628  *
629  * However, note that this function may still perform partial writes,
630  * just like g_output_stream_write().  If that occurs, to continue
631  * writing, you will need to create a new #GBytes containing just the
632  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
633  * #GBytes instance multiple times potentially can result in duplicated
634  * data in the output stream.
635  *
636  * Returns: Number of bytes written, or -1 on error
637  **/
638 gssize
639 g_output_stream_write_bytes (GOutputStream  *stream,
640                              GBytes         *bytes,
641                              GCancellable   *cancellable,
642                              GError        **error)
643 {
644   gsize size;
645   gconstpointer data;
646
647   data = g_bytes_get_data (bytes, &size);
648
649   return g_output_stream_write (stream,
650                                 data, size,
651                                 cancellable,
652                                 error);
653 }
654
655 /**
656  * g_output_stream_flush:
657  * @stream: a #GOutputStream.
658  * @cancellable: (nullable): optional cancellable object
659  * @error: location to store the error occurring, or %NULL to ignore
660  *
661  * Forces a write of all user-space buffered data for the given
662  * @stream. Will block during the operation. Closing the stream will
663  * implicitly cause a flush.
664  *
665  * This function is optional for inherited classes.
666  * 
667  * If @cancellable is not %NULL, then the operation can be cancelled by
668  * triggering the cancellable object from another thread. If the operation
669  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
670  *
671  * Returns: %TRUE on success, %FALSE on error
672  **/
673 gboolean
674 g_output_stream_flush (GOutputStream  *stream,
675                        GCancellable   *cancellable,
676                        GError        **error)
677 {
678   GOutputStreamClass *class;
679   gboolean res;
680
681   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
682
683   if (!g_output_stream_set_pending (stream, error))
684     return FALSE;
685   
686   class = G_OUTPUT_STREAM_GET_CLASS (stream);
687
688   res = TRUE;
689   if (class->flush)
690     {
691       if (cancellable)
692         g_cancellable_push_current (cancellable);
693       
694       res = class->flush (stream, cancellable, error);
695       
696       if (cancellable)
697         g_cancellable_pop_current (cancellable);
698     }
699   
700   g_output_stream_clear_pending (stream);
701
702   return res;
703 }
704
705 /**
706  * g_output_stream_splice:
707  * @stream: a #GOutputStream.
708  * @source: a #GInputStream.
709  * @flags: a set of #GOutputStreamSpliceFlags.
710  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
711  * @error: a #GError location to store the error occurring, or %NULL to
712  * ignore.
713  *
714  * Splices an input stream into an output stream.
715  *
716  * Returns: a #gssize containing the size of the data spliced, or
717  *     -1 if an error occurred. Note that if the number of bytes
718  *     spliced is greater than %G_MAXSSIZE, then that will be
719  *     returned, and there is no way to determine the actual number
720  *     of bytes spliced.
721  **/
722 gssize
723 g_output_stream_splice (GOutputStream             *stream,
724                         GInputStream              *source,
725                         GOutputStreamSpliceFlags   flags,
726                         GCancellable              *cancellable,
727                         GError                   **error)
728 {
729   GOutputStreamClass *class;
730   gssize bytes_copied;
731
732   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
733   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
734
735   if (g_input_stream_is_closed (source))
736     {
737       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
738                            _("Source stream is already closed"));
739       return -1;
740     }
741
742   if (!g_output_stream_set_pending (stream, error))
743     return -1;
744
745   class = G_OUTPUT_STREAM_GET_CLASS (stream);
746
747   if (cancellable)
748     g_cancellable_push_current (cancellable);
749
750   bytes_copied = class->splice (stream, source, flags, cancellable, error);
751
752   if (cancellable)
753     g_cancellable_pop_current (cancellable);
754
755   g_output_stream_clear_pending (stream);
756
757   return bytes_copied;
758 }
759
760 static gssize
761 g_output_stream_real_splice (GOutputStream             *stream,
762                              GInputStream              *source,
763                              GOutputStreamSpliceFlags   flags,
764                              GCancellable              *cancellable,
765                              GError                   **error)
766 {
767   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
768   gssize n_read, n_written;
769   gsize bytes_copied;
770   char buffer[8192], *p;
771   gboolean res;
772
773   bytes_copied = 0;
774   if (class->write_fn == NULL)
775     {
776       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
777                            _("Output stream doesn’t implement write"));
778       res = FALSE;
779       goto notsupported;
780     }
781
782   res = TRUE;
783   do
784     {
785       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
786       if (n_read == -1)
787         {
788           res = FALSE;
789           break;
790         }
791
792       if (n_read == 0)
793         break;
794
795       p = buffer;
796       while (n_read > 0)
797         {
798           n_written = class->write_fn (stream, p, n_read, cancellable, error);
799           if (n_written == -1)
800             {
801               res = FALSE;
802               break;
803             }
804
805           p += n_written;
806           n_read -= n_written;
807           bytes_copied += n_written;
808         }
809
810       if (bytes_copied > G_MAXSSIZE)
811         bytes_copied = G_MAXSSIZE;
812     }
813   while (res);
814
815  notsupported:
816   if (!res)
817     error = NULL; /* Ignore further errors */
818
819   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
820     {
821       /* Don't care about errors in source here */
822       g_input_stream_close (source, cancellable, NULL);
823     }
824
825   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
826     {
827       /* But write errors on close are bad! */
828       if (!g_output_stream_internal_close (stream, cancellable, error))
829         res = FALSE;
830     }
831
832   if (res)
833     return bytes_copied;
834
835   return -1;
836 }
837
838 /* Must always be called inside
839  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
840 static gboolean
841 g_output_stream_internal_close (GOutputStream  *stream,
842                                 GCancellable   *cancellable,
843                                 GError        **error)
844 {
845   GOutputStreamClass *class;
846   gboolean res;
847
848   if (stream->priv->closed)
849     return TRUE;
850
851   class = G_OUTPUT_STREAM_GET_CLASS (stream);
852
853   stream->priv->closing = TRUE;
854
855   if (cancellable)
856     g_cancellable_push_current (cancellable);
857
858   if (class->flush)
859     res = class->flush (stream, cancellable, error);
860   else
861     res = TRUE;
862
863   if (!res)
864     {
865       /* flushing caused the error that we want to return,
866        * but we still want to close the underlying stream if possible
867        */
868       if (class->close_fn)
869         class->close_fn (stream, cancellable, NULL);
870     }
871   else
872     {
873       res = TRUE;
874       if (class->close_fn)
875         res = class->close_fn (stream, cancellable, error);
876     }
877
878   if (cancellable)
879     g_cancellable_pop_current (cancellable);
880
881   stream->priv->closing = FALSE;
882   stream->priv->closed = TRUE;
883
884   return res;
885 }
886
887 /**
888  * g_output_stream_close:
889  * @stream: A #GOutputStream.
890  * @cancellable: (nullable): optional cancellable object
891  * @error: location to store the error occurring, or %NULL to ignore
892  *
893  * Closes the stream, releasing resources related to it.
894  *
895  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
896  * Closing a stream multiple times will not return an error.
897  *
898  * Closing a stream will automatically flush any outstanding buffers in the
899  * stream.
900  *
901  * Streams will be automatically closed when the last reference
902  * is dropped, but you might want to call this function to make sure 
903  * resources are released as early as possible.
904  *
905  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
906  * open after the stream is closed. See the documentation for the individual
907  * stream for details.
908  *
909  * On failure the first error that happened will be reported, but the close
910  * operation will finish as much as possible. A stream that failed to
911  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
912  * is important to check and report the error to the user, otherwise
913  * there might be a loss of data as all data might not be written.
914  * 
915  * If @cancellable is not %NULL, then the operation can be cancelled by
916  * triggering the cancellable object from another thread. If the operation
917  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
918  * Cancelling a close will still leave the stream closed, but there some streams
919  * can use a faster close that doesn't block to e.g. check errors. On
920  * cancellation (as with any error) there is no guarantee that all written
921  * data will reach the target. 
922  *
923  * Returns: %TRUE on success, %FALSE on failure
924  **/
925 gboolean
926 g_output_stream_close (GOutputStream  *stream,
927                        GCancellable   *cancellable,
928                        GError        **error)
929 {
930   gboolean res;
931
932   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
933
934   if (stream->priv->closed)
935     return TRUE;
936
937   if (!g_output_stream_set_pending (stream, error))
938     return FALSE;
939
940   res = g_output_stream_internal_close (stream, cancellable, error);
941
942   g_output_stream_clear_pending (stream);
943   
944   return res;
945 }
946
947 static void
948 async_ready_write_callback_wrapper (GObject      *source_object,
949                                     GAsyncResult *res,
950                                     gpointer      user_data)
951 {
952   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
953   GOutputStreamClass *class;
954   GTask *task = user_data;
955   gssize nwrote;
956   GError *error = NULL;
957
958   g_output_stream_clear_pending (stream);
959   
960   if (g_async_result_legacy_propagate_error (res, &error))
961     nwrote = -1;
962   else
963     {
964       class = G_OUTPUT_STREAM_GET_CLASS (stream);
965       nwrote = class->write_finish (stream, res, &error);
966     }
967
968   if (nwrote >= 0)
969     g_task_return_int (task, nwrote);
970   else
971     g_task_return_error (task, error);
972   g_object_unref (task);
973 }
974
975 /**
976  * g_output_stream_write_async:
977  * @stream: A #GOutputStream.
978  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
979  * @count: the number of bytes to write
980  * @io_priority: the io priority of the request.
981  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
982  * @callback: (scope async): a #GAsyncReadyCallback
983  *     to call when the request is satisfied
984  * @user_data: the data to pass to callback function
985  *
986  * Request an asynchronous write of @count bytes from @buffer into 
987  * the stream. When the operation is finished @callback will be called.
988  * You can then call g_output_stream_write_finish() to get the result of the 
989  * operation.
990  *
991  * During an async request no other sync and async calls are allowed, 
992  * and will result in %G_IO_ERROR_PENDING errors. 
993  *
994  * A value of @count larger than %G_MAXSSIZE will cause a 
995  * %G_IO_ERROR_INVALID_ARGUMENT error.
996  *
997  * On success, the number of bytes written will be passed to the
998  * @callback. It is not an error if this is not the same as the 
999  * requested size, as it can happen e.g. on a partial I/O error, 
1000  * but generally we try to write as many bytes as requested. 
1001  *
1002  * You are guaranteed that this method will never fail with
1003  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
1004  * method will just wait until this changes.
1005  *
1006  * Any outstanding I/O request with higher priority (lower numerical 
1007  * value) will be executed before an outstanding request with lower 
1008  * priority. Default priority is %G_PRIORITY_DEFAULT.
1009  *
1010  * The asynchronous methods have a default fallback that uses threads
1011  * to implement asynchronicity, so they are optional for inheriting 
1012  * classes. However, if you override one you must override all.
1013  *
1014  * For the synchronous, blocking version of this function, see 
1015  * g_output_stream_write().
1016  *
1017  * Note that no copy of @buffer will be made, so it must stay valid
1018  * until @callback is called. See g_output_stream_write_bytes_async()
1019  * for a #GBytes version that will automatically hold a reference to
1020  * the contents (without copying) for the duration of the call.
1021  */
1022 void
1023 g_output_stream_write_async (GOutputStream       *stream,
1024                              const void          *buffer,
1025                              gsize                count,
1026                              int                  io_priority,
1027                              GCancellable        *cancellable,
1028                              GAsyncReadyCallback  callback,
1029                              gpointer             user_data)
1030 {
1031   GOutputStreamClass *class;
1032   GError *error = NULL;
1033   GTask *task;
1034
1035   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1036   g_return_if_fail (buffer != NULL);
1037
1038   task = g_task_new (stream, cancellable, callback, user_data);
1039   g_task_set_source_tag (task, g_output_stream_write_async);
1040   g_task_set_priority (task, io_priority);
1041
1042   if (count == 0)
1043     {
1044       g_task_return_int (task, 0);
1045       g_object_unref (task);
1046       return;
1047     }
1048
1049   if (((gssize) count) < 0)
1050     {
1051       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1052                                _("Too large count value passed to %s"),
1053                                G_STRFUNC);
1054       g_object_unref (task);
1055       return;
1056     }
1057
1058   if (!g_output_stream_set_pending (stream, &error))
1059     {
1060       g_task_return_error (task, error);
1061       g_object_unref (task);
1062       return;
1063     }
1064   
1065   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1066
1067   class->write_async (stream, buffer, count, io_priority, cancellable,
1068                       async_ready_write_callback_wrapper, task);
1069 }
1070
1071 /**
1072  * g_output_stream_write_finish:
1073  * @stream: a #GOutputStream.
1074  * @result: a #GAsyncResult.
1075  * @error: a #GError location to store the error occurring, or %NULL to 
1076  * ignore.
1077  * 
1078  * Finishes a stream write operation.
1079  * 
1080  * Returns: a #gssize containing the number of bytes written to the stream.
1081  **/
1082 gssize
1083 g_output_stream_write_finish (GOutputStream  *stream,
1084                               GAsyncResult   *result,
1085                               GError        **error)
1086 {
1087   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1088   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1089   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
1090
1091   /* @result is always the GTask created by g_output_stream_write_async();
1092    * we called class->write_finish() from async_ready_write_callback_wrapper.
1093    */
1094   return g_task_propagate_int (G_TASK (result), error);
1095 }
1096
1097 typedef struct
1098 {
1099   const guint8 *buffer;
1100   gsize to_write;
1101   gsize bytes_written;
1102 } AsyncWriteAll;
1103
1104 static void
1105 free_async_write_all (gpointer data)
1106 {
1107   g_slice_free (AsyncWriteAll, data);
1108 }
1109
1110 static void
1111 write_all_callback (GObject      *stream,
1112                     GAsyncResult *result,
1113                     gpointer      user_data)
1114 {
1115   GTask *task = user_data;
1116   AsyncWriteAll *data = g_task_get_task_data (task);
1117
1118   if (result)
1119     {
1120       GError *error = NULL;
1121       gssize nwritten;
1122
1123       nwritten = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), result, &error);
1124
1125       if (nwritten == -1)
1126         {
1127           g_task_return_error (task, error);
1128           g_object_unref (task);
1129           return;
1130         }
1131
1132       g_assert_cmpint (nwritten, <=, data->to_write);
1133       g_warn_if_fail (nwritten > 0);
1134
1135       data->to_write -= nwritten;
1136       data->bytes_written += nwritten;
1137     }
1138
1139   if (data->to_write == 0)
1140     {
1141       g_task_return_boolean (task, TRUE);
1142       g_object_unref (task);
1143     }
1144   else
1145     g_output_stream_write_async (G_OUTPUT_STREAM (stream),
1146                                  data->buffer + data->bytes_written,
1147                                  data->to_write,
1148                                  g_task_get_priority (task),
1149                                  g_task_get_cancellable (task),
1150                                  write_all_callback, task);
1151 }
1152
1153 static void
1154 write_all_async_thread (GTask        *task,
1155                         gpointer      source_object,
1156                         gpointer      task_data,
1157                         GCancellable *cancellable)
1158 {
1159   GOutputStream *stream = source_object;
1160   AsyncWriteAll *data = task_data;
1161   GError *error = NULL;
1162
1163   if (g_output_stream_write_all (stream, data->buffer, data->to_write, &data->bytes_written,
1164                                  g_task_get_cancellable (task), &error))
1165     g_task_return_boolean (task, TRUE);
1166   else
1167     g_task_return_error (task, error);
1168 }
1169
1170 /**
1171  * g_output_stream_write_all_async:
1172  * @stream: A #GOutputStream
1173  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write
1174  * @count: the number of bytes to write
1175  * @io_priority: the io priority of the request
1176  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
1177  * @callback: (scope async): a #GAsyncReadyCallback
1178  *     to call when the request is satisfied
1179  * @user_data: the data to pass to callback function
1180  *
1181  * Request an asynchronous write of @count bytes from @buffer into
1182  * the stream. When the operation is finished @callback will be called.
1183  * You can then call g_output_stream_write_all_finish() to get the result of the
1184  * operation.
1185  *
1186  * This is the asynchronous version of g_output_stream_write_all().
1187  *
1188  * Call g_output_stream_write_all_finish() to collect the result.
1189  *
1190  * Any outstanding I/O request with higher priority (lower numerical
1191  * value) will be executed before an outstanding request with lower
1192  * priority. Default priority is %G_PRIORITY_DEFAULT.
1193  *
1194  * Note that no copy of @buffer will be made, so it must stay valid
1195  * until @callback is called.
1196  *
1197  * Since: 2.44
1198  */
1199 void
1200 g_output_stream_write_all_async (GOutputStream       *stream,
1201                                  const void          *buffer,
1202                                  gsize                count,
1203                                  int                  io_priority,
1204                                  GCancellable        *cancellable,
1205                                  GAsyncReadyCallback  callback,
1206                                  gpointer             user_data)
1207 {
1208   AsyncWriteAll *data;
1209   GTask *task;
1210
1211   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1212   g_return_if_fail (buffer != NULL || count == 0);
1213
1214   task = g_task_new (stream, cancellable, callback, user_data);
1215   data = g_slice_new0 (AsyncWriteAll);
1216   data->buffer = buffer;
1217   data->to_write = count;
1218
1219   g_task_set_source_tag (task, g_output_stream_write_all_async);
1220   g_task_set_task_data (task, data, free_async_write_all);
1221   g_task_set_priority (task, io_priority);
1222
1223   /* If async writes are going to be handled via the threadpool anyway
1224    * then we may as well do it with a single dispatch instead of
1225    * bouncing in and out.
1226    */
1227   if (g_output_stream_async_write_is_via_threads (stream))
1228     {
1229       g_task_run_in_thread (task, write_all_async_thread);
1230       g_object_unref (task);
1231     }
1232   else
1233     write_all_callback (G_OBJECT (stream), NULL, task);
1234 }
1235
1236 /**
1237  * g_output_stream_write_all_finish:
1238  * @stream: a #GOutputStream
1239  * @result: a #GAsyncResult
1240  * @bytes_written: (out) (optional): location to store the number of bytes that was written to the stream
1241  * @error: a #GError location to store the error occurring, or %NULL to ignore.
1242  *
1243  * Finishes an asynchronous stream write operation started with
1244  * g_output_stream_write_all_async().
1245  *
1246  * As a special exception to the normal conventions for functions that
1247  * use #GError, if this function returns %FALSE (and sets @error) then
1248  * @bytes_written will be set to the number of bytes that were
1249  * successfully written before the error was encountered.  This
1250  * functionality is only available from C.  If you need it from another
1251  * language then you must write your own loop around
1252  * g_output_stream_write_async().
1253  *
1254  * Returns: %TRUE on success, %FALSE if there was an error
1255  *
1256  * Since: 2.44
1257  **/
1258 gboolean
1259 g_output_stream_write_all_finish (GOutputStream  *stream,
1260                                   GAsyncResult   *result,
1261                                   gsize          *bytes_written,
1262                                   GError        **error)
1263 {
1264   GTask *task;
1265
1266   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1267   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1268
1269   task = G_TASK (result);
1270
1271   if (bytes_written)
1272     {
1273       AsyncWriteAll *data = (AsyncWriteAll *)g_task_get_task_data (task);
1274
1275       *bytes_written = data->bytes_written;
1276     }
1277
1278   return g_task_propagate_boolean (task, error);
1279 }
1280
1281 /**
1282  * g_output_stream_writev_async:
1283  * @stream: A #GOutputStream.
1284  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
1285  * @n_vectors: the number of vectors to write
1286  * @io_priority: the I/O priority of the request.
1287  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1288  * @callback: (scope async): a #GAsyncReadyCallback
1289  *     to call when the request is satisfied
1290  * @user_data: the data to pass to callback function
1291  *
1292  * Request an asynchronous write of the bytes contained in @n_vectors @vectors into
1293  * the stream. When the operation is finished @callback will be called.
1294  * You can then call g_output_stream_writev_finish() to get the result of the
1295  * operation.
1296  *
1297  * During an async request no other sync and async calls are allowed,
1298  * and will result in %G_IO_ERROR_PENDING errors.
1299  *
1300  * On success, the number of bytes written will be passed to the
1301  * @callback. It is not an error if this is not the same as the
1302  * requested size, as it can happen e.g. on a partial I/O error,
1303  * but generally we try to write as many bytes as requested.
1304  *
1305  * You are guaranteed that this method will never fail with
1306  * %G_IO_ERROR_WOULD_BLOCK â€” if @stream can't accept more data, the
1307  * method will just wait until this changes.
1308  *
1309  * Any outstanding I/O request with higher priority (lower numerical
1310  * value) will be executed before an outstanding request with lower
1311  * priority. Default priority is %G_PRIORITY_DEFAULT.
1312  *
1313  * The asynchronous methods have a default fallback that uses threads
1314  * to implement asynchronicity, so they are optional for inheriting
1315  * classes. However, if you override one you must override all.
1316  *
1317  * For the synchronous, blocking version of this function, see
1318  * g_output_stream_writev().
1319  *
1320  * Note that no copy of @vectors will be made, so it must stay valid
1321  * until @callback is called.
1322  *
1323  * Since: 2.60
1324  */
1325 void
1326 g_output_stream_writev_async (GOutputStream             *stream,
1327                               const GOutputVector       *vectors,
1328                               gsize                      n_vectors,
1329                               int                        io_priority,
1330                               GCancellable              *cancellable,
1331                               GAsyncReadyCallback        callback,
1332                               gpointer                   user_data)
1333 {
1334   GOutputStreamClass *class;
1335
1336   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1337   g_return_if_fail (vectors != NULL || n_vectors == 0);
1338   g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
1339
1340   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1341   g_return_if_fail (class->writev_async != NULL);
1342
1343   class->writev_async (stream, vectors, n_vectors, io_priority, cancellable,
1344                        callback, user_data);
1345 }
1346
1347 /**
1348  * g_output_stream_writev_finish:
1349  * @stream: a #GOutputStream.
1350  * @result: a #GAsyncResult.
1351  * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
1352  * @error: a #GError location to store the error occurring, or %NULL to
1353  * ignore.
1354  *
1355  * Finishes a stream writev operation.
1356  *
1357  * Returns: %TRUE on success, %FALSE if there was an error
1358  *
1359  * Since: 2.60
1360  */
1361 gboolean
1362 g_output_stream_writev_finish (GOutputStream  *stream,
1363                                GAsyncResult   *result,
1364                                gsize          *bytes_written,
1365                                GError        **error)
1366 {
1367   GOutputStreamClass *class;
1368   gboolean res;
1369   gsize _bytes_written = 0;
1370
1371   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1372   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1373   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
1374
1375   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1376   g_return_val_if_fail (class->writev_finish != NULL, FALSE);
1377
1378   res = class->writev_finish (stream, result, &_bytes_written, error);
1379
1380   g_warn_if_fail (res || _bytes_written == 0);
1381   g_warn_if_fail (res || (error == NULL || *error != NULL));
1382
1383   if (bytes_written)
1384     *bytes_written = _bytes_written;
1385
1386   return res;
1387 }
1388
1389 typedef struct
1390 {
1391   GOutputVector *vectors;
1392   gsize n_vectors; /* (unowned) */
1393   gsize bytes_written;
1394 } AsyncWritevAll;
1395
1396 static void
1397 free_async_writev_all (gpointer data)
1398 {
1399   g_slice_free (AsyncWritevAll, data);
1400 }
1401
1402 static void
1403 writev_all_callback (GObject      *stream,
1404                      GAsyncResult *result,
1405                      gpointer      user_data)
1406 {
1407   GTask *task = user_data;
1408   AsyncWritevAll *data = g_task_get_task_data (task);
1409   gint priority = g_task_get_priority (task);
1410   GCancellable *cancellable = g_task_get_cancellable (task);
1411
1412   if (result)
1413     {
1414       GError *error = NULL;
1415       gboolean res;
1416       gsize n_written = 0;
1417
1418       res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error);
1419
1420       if (!res)
1421         {
1422           g_task_return_error (task, g_steal_pointer (&error));
1423           g_object_unref (task);
1424           return;
1425         }
1426
1427       g_warn_if_fail (n_written > 0);
1428       data->bytes_written += n_written;
1429
1430       /* skip vectors that have been written in full */
1431       while (data->n_vectors > 0 && n_written >= data->vectors[0].size)
1432         {
1433           n_written -= data->vectors[0].size;
1434           ++data->vectors;
1435           --data->n_vectors;
1436         }
1437       /* skip partially written vector data */
1438       if (n_written > 0 && data->n_vectors > 0)
1439         {
1440           data->vectors[0].size -= n_written;
1441           data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written;
1442         }
1443     }
1444
1445   if (data->n_vectors == 0)
1446     {
1447       g_task_return_boolean (task, TRUE);
1448       g_object_unref (task);
1449     }
1450   else
1451     g_output_stream_writev_async (G_OUTPUT_STREAM (stream),
1452                                   data->vectors,
1453                                   data->n_vectors,
1454                                   priority,
1455                                   cancellable,
1456                                   writev_all_callback, g_steal_pointer (&task));
1457 }
1458
1459 static void
1460 writev_all_async_thread (GTask        *task,
1461                          gpointer      source_object,
1462                          gpointer      task_data,
1463                          GCancellable *cancellable)
1464 {
1465   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1466   AsyncWritevAll *data = task_data;
1467   GError *error = NULL;
1468
1469   if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written,
1470                                   g_task_get_cancellable (task), &error))
1471     g_task_return_boolean (task, TRUE);
1472   else
1473     g_task_return_error (task, g_steal_pointer (&error));
1474 }
1475
1476 /**
1477  * g_output_stream_writev_all_async:
1478  * @stream: A #GOutputStream
1479  * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
1480  * @n_vectors: the number of vectors to write
1481  * @io_priority: the I/O priority of the request
1482  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
1483  * @callback: (scope async): a #GAsyncReadyCallback
1484  *     to call when the request is satisfied
1485  * @user_data: the data to pass to callback function
1486  *
1487  * Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
1488  * the stream. When the operation is finished @callback will be called.
1489  * You can then call g_output_stream_writev_all_finish() to get the result of the
1490  * operation.
1491  *
1492  * This is the asynchronous version of g_output_stream_writev_all().
1493  *
1494  * Call g_output_stream_writev_all_finish() to collect the result.
1495  *
1496  * Any outstanding I/O request with higher priority (lower numerical
1497  * value) will be executed before an outstanding request with lower
1498  * priority. Default priority is %G_PRIORITY_DEFAULT.
1499  *
1500  * Note that no copy of @vectors will be made, so it must stay valid
1501  * until @callback is called. The content of the individual elements
1502  * of @vectors might be changed by this function.
1503  *
1504  * Since: 2.60
1505  */
1506 void
1507 g_output_stream_writev_all_async (GOutputStream       *stream,
1508                                   GOutputVector       *vectors,
1509                                   gsize                n_vectors,
1510                                   int                  io_priority,
1511                                   GCancellable        *cancellable,
1512                                   GAsyncReadyCallback  callback,
1513                                   gpointer             user_data)
1514 {
1515   AsyncWritevAll *data;
1516   GTask *task;
1517   gsize i, to_be_written = 0;
1518
1519   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1520   g_return_if_fail (vectors != NULL || n_vectors == 0);
1521   g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
1522
1523   task = g_task_new (stream, cancellable, callback, user_data);
1524   data = g_slice_new0 (AsyncWritevAll);
1525   data->vectors = vectors;
1526   data->n_vectors = n_vectors;
1527
1528   g_task_set_source_tag (task, g_output_stream_writev_all_async);
1529   g_task_set_task_data (task, data, free_async_writev_all);
1530   g_task_set_priority (task, io_priority);
1531
1532   /* We can't write more than G_MAXSIZE bytes overall, otherwise we
1533    * would overflow the bytes_written counter */
1534   for (i = 0; i < n_vectors; i++)
1535     {
1536        if (to_be_written > G_MAXSIZE - vectors[i].size)
1537          {
1538            g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
1539                                     _("Sum of vectors passed to %s too large"),
1540                                     G_STRFUNC);
1541            g_object_unref (task);
1542            return;
1543          }
1544        to_be_written += vectors[i].size;
1545     }
1546
1547   /* If async writes are going to be handled via the threadpool anyway
1548    * then we may as well do it with a single dispatch instead of
1549    * bouncing in and out.
1550    */
1551   if (g_output_stream_async_writev_is_via_threads (stream))
1552     {
1553       g_task_run_in_thread (task, writev_all_async_thread);
1554       g_object_unref (task);
1555     }
1556   else
1557     writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task));
1558 }
1559
1560 /**
1561  * g_output_stream_writev_all_finish:
1562  * @stream: a #GOutputStream
1563  * @result: a #GAsyncResult
1564  * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
1565  * @error: a #GError location to store the error occurring, or %NULL to ignore.
1566  *
1567  * Finishes an asynchronous stream write operation started with
1568  * g_output_stream_writev_all_async().
1569  *
1570  * As a special exception to the normal conventions for functions that
1571  * use #GError, if this function returns %FALSE (and sets @error) then
1572  * @bytes_written will be set to the number of bytes that were
1573  * successfully written before the error was encountered.  This
1574  * functionality is only available from C.  If you need it from another
1575  * language then you must write your own loop around
1576  * g_output_stream_writev_async().
1577  *
1578  * Returns: %TRUE on success, %FALSE if there was an error
1579  *
1580  * Since: 2.60
1581  */
1582 gboolean
1583 g_output_stream_writev_all_finish (GOutputStream  *stream,
1584                                    GAsyncResult   *result,
1585                                    gsize          *bytes_written,
1586                                    GError        **error)
1587 {
1588   GTask *task;
1589
1590   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1591   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1592   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
1593
1594   task = G_TASK (result);
1595
1596   if (bytes_written)
1597     {
1598       AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task);
1599
1600       *bytes_written = data->bytes_written;
1601     }
1602
1603   return g_task_propagate_boolean (task, error);
1604 }
1605
1606 static void
1607 write_bytes_callback (GObject      *stream,
1608                       GAsyncResult *result,
1609                       gpointer      user_data)
1610 {
1611   GTask *task = user_data;
1612   GError *error = NULL;
1613   gssize nwrote;
1614
1615   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
1616                                          result, &error);
1617   if (nwrote == -1)
1618     g_task_return_error (task, error);
1619   else
1620     g_task_return_int (task, nwrote);
1621   g_object_unref (task);
1622 }
1623
1624 /**
1625  * g_output_stream_write_bytes_async:
1626  * @stream: A #GOutputStream.
1627  * @bytes: The bytes to write
1628  * @io_priority: the io priority of the request.
1629  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1630  * @callback: (scope async) (closure user_data): a #GAsyncReadyCallback
1631  *   to call when the request is satisfied
1632  * @user_data: the data to pass to callback function
1633  *
1634  * This function is similar to g_output_stream_write_async(), but
1635  * takes a #GBytes as input.  Due to the refcounted nature of #GBytes,
1636  * this allows the stream to avoid taking a copy of the data.
1637  *
1638  * However, note that this function may still perform partial writes,
1639  * just like g_output_stream_write_async(). If that occurs, to continue
1640  * writing, you will need to create a new #GBytes containing just the
1641  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
1642  * #GBytes instance multiple times potentially can result in duplicated
1643  * data in the output stream.
1644  *
1645  * For the synchronous, blocking version of this function, see
1646  * g_output_stream_write_bytes().
1647  **/
1648 void
1649 g_output_stream_write_bytes_async (GOutputStream       *stream,
1650                                    GBytes              *bytes,
1651                                    int                  io_priority,
1652                                    GCancellable        *cancellable,
1653                                    GAsyncReadyCallback  callback,
1654                                    gpointer             user_data)
1655 {
1656   GTask *task;
1657   gsize size;
1658   gconstpointer data;
1659
1660   data = g_bytes_get_data (bytes, &size);
1661
1662   task = g_task_new (stream, cancellable, callback, user_data);
1663   g_task_set_source_tag (task, g_output_stream_write_bytes_async);
1664   g_task_set_task_data (task, g_bytes_ref (bytes),
1665                         (GDestroyNotify) g_bytes_unref);
1666
1667   g_output_stream_write_async (stream,
1668                                data, size,
1669                                io_priority,
1670                                cancellable,
1671                                write_bytes_callback,
1672                                task);
1673 }
1674
1675 /**
1676  * g_output_stream_write_bytes_finish:
1677  * @stream: a #GOutputStream.
1678  * @result: a #GAsyncResult.
1679  * @error: a #GError location to store the error occurring, or %NULL to
1680  * ignore.
1681  *
1682  * Finishes a stream write-from-#GBytes operation.
1683  *
1684  * Returns: a #gssize containing the number of bytes written to the stream.
1685  **/
1686 gssize
1687 g_output_stream_write_bytes_finish (GOutputStream  *stream,
1688                                     GAsyncResult   *result,
1689                                     GError        **error)
1690 {
1691   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
1692   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1693
1694   return g_task_propagate_int (G_TASK (result), error);
1695 }
1696
1697 static void
1698 async_ready_splice_callback_wrapper (GObject      *source_object,
1699                                      GAsyncResult *res,
1700                                      gpointer     _data)
1701 {
1702   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1703   GOutputStreamClass *class;
1704   GTask *task = _data;
1705   gssize nspliced;
1706   GError *error = NULL;
1707
1708   g_output_stream_clear_pending (stream);
1709   
1710   if (g_async_result_legacy_propagate_error (res, &error))
1711     nspliced = -1;
1712   else
1713     {
1714       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1715       nspliced = class->splice_finish (stream, res, &error);
1716     }
1717
1718   if (nspliced >= 0)
1719     g_task_return_int (task, nspliced);
1720   else
1721     g_task_return_error (task, error);
1722   g_object_unref (task);
1723 }
1724
1725 /**
1726  * g_output_stream_splice_async:
1727  * @stream: a #GOutputStream.
1728  * @source: a #GInputStream. 
1729  * @flags: a set of #GOutputStreamSpliceFlags.
1730  * @io_priority: the io priority of the request.
1731  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore. 
1732  * @callback: (scope async) (closure user_data): a #GAsyncReadyCallback
1733  *   to call when the request is satisfied
1734  * @user_data: the data to pass to callback function
1735  * 
1736  * Splices a stream asynchronously.
1737  * When the operation is finished @callback will be called.
1738  * You can then call g_output_stream_splice_finish() to get the 
1739  * result of the operation.
1740  *
1741  * For the synchronous, blocking version of this function, see 
1742  * g_output_stream_splice().
1743  **/
1744 void
1745 g_output_stream_splice_async (GOutputStream            *stream,
1746                               GInputStream             *source,
1747                               GOutputStreamSpliceFlags  flags,
1748                               int                       io_priority,
1749                               GCancellable             *cancellable,
1750                               GAsyncReadyCallback       callback,
1751                               gpointer                  user_data)
1752 {
1753   GOutputStreamClass *class;
1754   GTask *task;
1755   GError *error = NULL;
1756
1757   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1758   g_return_if_fail (G_IS_INPUT_STREAM (source));
1759
1760   task = g_task_new (stream, cancellable, callback, user_data);
1761   g_task_set_source_tag (task, g_output_stream_splice_async);
1762   g_task_set_priority (task, io_priority);
1763   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
1764
1765   if (g_input_stream_is_closed (source))
1766     {
1767       g_task_return_new_error (task,
1768                                G_IO_ERROR, G_IO_ERROR_CLOSED,
1769                                _("Source stream is already closed"));
1770       g_object_unref (task);
1771       return;
1772     }
1773   
1774   if (!g_output_stream_set_pending (stream, &error))
1775     {
1776       g_task_return_error (task, error);
1777       g_object_unref (task);
1778       return;
1779     }
1780
1781   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1782
1783   class->splice_async (stream, source, flags, io_priority, cancellable,
1784                        async_ready_splice_callback_wrapper, task);
1785 }
1786
1787 /**
1788  * g_output_stream_splice_finish:
1789  * @stream: a #GOutputStream.
1790  * @result: a #GAsyncResult.
1791  * @error: a #GError location to store the error occurring, or %NULL to 
1792  * ignore.
1793  *
1794  * Finishes an asynchronous stream splice operation.
1795  * 
1796  * Returns: a #gssize of the number of bytes spliced. Note that if the
1797  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1798  *     will be returned, and there is no way to determine the actual
1799  *     number of bytes spliced.
1800  **/
1801 gssize
1802 g_output_stream_splice_finish (GOutputStream  *stream,
1803                                GAsyncResult   *result,
1804                                GError        **error)
1805 {
1806   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1807   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1808   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
1809
1810   /* @result is always the GTask created by g_output_stream_splice_async();
1811    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
1812    */
1813   return g_task_propagate_int (G_TASK (result), error);
1814 }
1815
1816 static void
1817 async_ready_flush_callback_wrapper (GObject      *source_object,
1818                                     GAsyncResult *res,
1819                                     gpointer      user_data)
1820 {
1821   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1822   GOutputStreamClass *class;
1823   GTask *task = user_data;
1824   gboolean flushed;
1825   GError *error = NULL;
1826
1827   g_output_stream_clear_pending (stream);
1828   
1829   if (g_async_result_legacy_propagate_error (res, &error))
1830     flushed = FALSE;
1831   else
1832     {
1833       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1834       flushed = class->flush_finish (stream, res, &error);
1835     }
1836
1837   if (flushed)
1838     g_task_return_boolean (task, TRUE);
1839   else
1840     g_task_return_error (task, error);
1841   g_object_unref (task);
1842 }
1843
1844 /**
1845  * g_output_stream_flush_async:
1846  * @stream: a #GOutputStream.
1847  * @io_priority: the io priority of the request.
1848  * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
1849  * @callback: (scope async) (closure user_data): a #GAsyncReadyCallback
1850  *   to call when the request is satisfied
1851  * @user_data: the data to pass to callback function
1852  * 
1853  * Forces an asynchronous write of all user-space buffered data for
1854  * the given @stream.
1855  * For behaviour details see g_output_stream_flush().
1856  *
1857  * When the operation is finished @callback will be 
1858  * called. You can then call g_output_stream_flush_finish() to get the 
1859  * result of the operation.
1860  **/
1861 void
1862 g_output_stream_flush_async (GOutputStream       *stream,
1863                              int                  io_priority,
1864                              GCancellable        *cancellable,
1865                              GAsyncReadyCallback  callback,
1866                              gpointer             user_data)
1867 {
1868   GOutputStreamClass *class;
1869   GTask *task;
1870   GError *error = NULL;
1871
1872   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1873
1874   task = g_task_new (stream, cancellable, callback, user_data);
1875   g_task_set_source_tag (task, g_output_stream_flush_async);
1876   g_task_set_priority (task, io_priority);
1877
1878   if (!g_output_stream_set_pending (stream, &error))
1879     {
1880       g_task_return_error (task, error);
1881       g_object_unref (task);
1882       return;
1883     }
1884
1885   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1886   
1887   if (class->flush_async == NULL)
1888     {
1889       g_output_stream_clear_pending (stream);
1890       g_task_return_boolean (task, TRUE);
1891       g_object_unref (task);
1892       return;
1893     }
1894       
1895   class->flush_async (stream, io_priority, cancellable,
1896                       async_ready_flush_callback_wrapper, task);
1897 }
1898
1899 /**
1900  * g_output_stream_flush_finish:
1901  * @stream: a #GOutputStream.
1902  * @result: a GAsyncResult.
1903  * @error: a #GError location to store the error occurring, or %NULL to 
1904  * ignore.
1905  * 
1906  * Finishes flushing an output stream.
1907  * 
1908  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1909  **/
1910 gboolean
1911 g_output_stream_flush_finish (GOutputStream  *stream,
1912                               GAsyncResult   *result,
1913                               GError        **error)
1914 {
1915   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1916   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1917   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1918
1919   /* @result is always the GTask created by g_output_stream_flush_async();
1920    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1921    */
1922   return g_task_propagate_boolean (G_TASK (result), error);
1923 }
1924
1925
1926 static void
1927 async_ready_close_callback_wrapper (GObject      *source_object,
1928                                     GAsyncResult *res,
1929                                     gpointer      user_data)
1930 {
1931   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1932   GOutputStreamClass *class;
1933   GTask *task = user_data;
1934   GError *error = g_task_get_task_data (task);
1935
1936   stream->priv->closing = FALSE;
1937   stream->priv->closed = TRUE;
1938
1939   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1940     {
1941       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1942
1943       class->close_finish (stream, res,
1944                            error ? NULL : &error);
1945     }
1946
1947   if (error != NULL)
1948     g_task_return_error (task, error);
1949   else
1950     g_task_return_boolean (task, TRUE);
1951   g_object_unref (task);
1952 }
1953
1954 static void
1955 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1956                                             GAsyncResult *res,
1957                                             gpointer      user_data)
1958 {
1959   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1960   GOutputStreamClass *class;
1961   GTask *task = user_data;
1962   GError *error = NULL;
1963
1964   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1965
1966   if (!g_async_result_legacy_propagate_error (res, &error))
1967     {
1968       class->flush_finish (stream, res, &error);
1969     }
1970
1971   /* propagate the possible error */
1972   if (error)
1973     g_task_set_task_data (task, error, NULL);
1974
1975   /* we still close, even if there was a flush error */
1976   class->close_async (stream,
1977                       g_task_get_priority (task),
1978                       g_task_get_cancellable (task),
1979                       async_ready_close_callback_wrapper, task);
1980 }
1981
1982 static void
1983 real_close_async_cb (GObject      *source_object,
1984                      GAsyncResult *res,
1985                      gpointer      user_data)
1986 {
1987   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1988   GTask *task = user_data;
1989   GError *error = NULL;
1990   gboolean ret;
1991
1992   g_output_stream_clear_pending (stream);
1993
1994   ret = g_output_stream_internal_close_finish (stream, res, &error);
1995
1996   if (error != NULL)
1997     g_task_return_error (task, error);
1998   else
1999     g_task_return_boolean (task, ret);
2000
2001   g_object_unref (task);
2002 }
2003
2004 /**
2005  * g_output_stream_close_async:
2006  * @stream: A #GOutputStream.
2007  * @io_priority: the io priority of the request.
2008  * @cancellable: (nullable): optional cancellable object
2009  * @callback: (scope async) (closure user_data): a #GAsyncReadyCallback
2010  *   to call when the request is satisfied
2011  * @user_data: the data to pass to callback function
2012  *
2013  * Requests an asynchronous close of the stream, releasing resources 
2014  * related to it. When the operation is finished @callback will be 
2015  * called. You can then call g_output_stream_close_finish() to get 
2016  * the result of the operation.
2017  *
2018  * For behaviour details see g_output_stream_close().
2019  *
2020  * The asynchronous methods have a default fallback that uses threads
2021  * to implement asynchronicity, so they are optional for inheriting 
2022  * classes. However, if you override one you must override all.
2023  **/
2024 void
2025 g_output_stream_close_async (GOutputStream       *stream,
2026                              int                  io_priority,
2027                              GCancellable        *cancellable,
2028                              GAsyncReadyCallback  callback,
2029                              gpointer             user_data)
2030 {
2031   GTask *task;
2032   GError *error = NULL;
2033
2034   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
2035   
2036   task = g_task_new (stream, cancellable, callback, user_data);
2037   g_task_set_source_tag (task, g_output_stream_close_async);
2038   g_task_set_priority (task, io_priority);
2039
2040   if (!g_output_stream_set_pending (stream, &error))
2041     {
2042       g_task_return_error (task, error);
2043       g_object_unref (task);
2044       return;
2045     }
2046
2047   g_output_stream_internal_close_async (stream, io_priority, cancellable,
2048                                         real_close_async_cb, task);
2049 }
2050
2051 /* Must always be called inside
2052  * g_output_stream_set_pending()/g_output_stream_clear_pending().
2053  */
2054 void
2055 g_output_stream_internal_close_async (GOutputStream       *stream,
2056                                       int                  io_priority,
2057                                       GCancellable        *cancellable,
2058                                       GAsyncReadyCallback  callback,
2059                                       gpointer             user_data)
2060 {
2061   GOutputStreamClass *class;
2062   GTask *task;
2063
2064   task = g_task_new (stream, cancellable, callback, user_data);
2065   g_task_set_source_tag (task, g_output_stream_internal_close_async);
2066   g_task_set_priority (task, io_priority);
2067
2068   if (stream->priv->closed)
2069     {
2070       g_task_return_boolean (task, TRUE);
2071       g_object_unref (task);
2072       return;
2073     }
2074
2075   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2076   stream->priv->closing = TRUE;
2077
2078   /* Call close_async directly if there is no need to flush, or if the flush
2079      can be done sync (in the output stream async close thread) */
2080   if (class->flush_async == NULL ||
2081       (class->flush_async == g_output_stream_real_flush_async &&
2082        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
2083     {
2084       class->close_async (stream, io_priority, cancellable,
2085                           async_ready_close_callback_wrapper, task);
2086     }
2087   else
2088     {
2089       /* First do an async flush, then do the async close in the callback
2090          wrapper (see async_ready_close_flushed_callback_wrapper) */
2091       class->flush_async (stream, io_priority, cancellable,
2092                           async_ready_close_flushed_callback_wrapper, task);
2093     }
2094 }
2095
2096 static gboolean
2097 g_output_stream_internal_close_finish (GOutputStream  *stream,
2098                                        GAsyncResult   *result,
2099                                        GError        **error)
2100 {
2101   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2102   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2103   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
2104
2105   return g_task_propagate_boolean (G_TASK (result), error);
2106 }
2107
2108 /**
2109  * g_output_stream_close_finish:
2110  * @stream: a #GOutputStream.
2111  * @result: a #GAsyncResult.
2112  * @error: a #GError location to store the error occurring, or %NULL to 
2113  * ignore.
2114  * 
2115  * Closes an output stream.
2116  * 
2117  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
2118  **/
2119 gboolean
2120 g_output_stream_close_finish (GOutputStream  *stream,
2121                               GAsyncResult   *result,
2122                               GError        **error)
2123 {
2124   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2125   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2126   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
2127
2128   /* @result is always the GTask created by g_output_stream_close_async();
2129    * we called class->close_finish() from async_ready_close_callback_wrapper.
2130    */
2131   return g_task_propagate_boolean (G_TASK (result), error);
2132 }
2133
2134 /**
2135  * g_output_stream_is_closed:
2136  * @stream: a #GOutputStream.
2137  * 
2138  * Checks if an output stream has already been closed.
2139  * 
2140  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
2141  **/
2142 gboolean
2143 g_output_stream_is_closed (GOutputStream *stream)
2144 {
2145   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
2146   
2147   return stream->priv->closed;
2148 }
2149
2150 /**
2151  * g_output_stream_is_closing:
2152  * @stream: a #GOutputStream.
2153  *
2154  * Checks if an output stream is being closed. This can be
2155  * used inside e.g. a flush implementation to see if the
2156  * flush (or other i/o operation) is called from within
2157  * the closing operation.
2158  *
2159  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
2160  *
2161  * Since: 2.24
2162  **/
2163 gboolean
2164 g_output_stream_is_closing (GOutputStream *stream)
2165 {
2166   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
2167
2168   return stream->priv->closing;
2169 }
2170
2171 /**
2172  * g_output_stream_has_pending:
2173  * @stream: a #GOutputStream.
2174  * 
2175  * Checks if an output stream has pending actions.
2176  * 
2177  * Returns: %TRUE if @stream has pending actions. 
2178  **/
2179 gboolean
2180 g_output_stream_has_pending (GOutputStream *stream)
2181 {
2182   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2183   
2184   return stream->priv->pending;
2185 }
2186
2187 /**
2188  * g_output_stream_set_pending:
2189  * @stream: a #GOutputStream.
2190  * @error: a #GError location to store the error occurring, or %NULL to 
2191  * ignore.
2192  * 
2193  * Sets @stream to have actions pending. If the pending flag is
2194  * already set or @stream is closed, it will return %FALSE and set
2195  * @error.
2196  *
2197  * Returns: %TRUE if pending was previously unset and is now set.
2198  **/
2199 gboolean
2200 g_output_stream_set_pending (GOutputStream *stream,
2201                              GError **error)
2202 {
2203   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2204   
2205   if (stream->priv->closed)
2206     {
2207       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
2208                            _("Stream is already closed"));
2209       return FALSE;
2210     }
2211   
2212   if (stream->priv->pending)
2213     {
2214       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
2215                            /* Translators: This is an error you get if there is
2216                             * already an operation running against this stream when
2217                             * you try to start one */
2218                            _("Stream has outstanding operation"));
2219       return FALSE;
2220     }
2221   
2222   stream->priv->pending = TRUE;
2223   return TRUE;
2224 }
2225
2226 /**
2227  * g_output_stream_clear_pending:
2228  * @stream: output stream
2229  * 
2230  * Clears the pending flag on @stream.
2231  **/
2232 void
2233 g_output_stream_clear_pending (GOutputStream *stream)
2234 {
2235   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
2236   
2237   stream->priv->pending = FALSE;
2238 }
2239
2240 /*< internal >
2241  * g_output_stream_async_write_is_via_threads:
2242  * @stream: a #GOutputStream.
2243  *
2244  * Checks if an output stream's write_async function uses threads.
2245  *
2246  * Returns: %TRUE if @stream's write_async function uses threads.
2247  **/
2248 gboolean
2249 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
2250 {
2251   GOutputStreamClass *class;
2252
2253   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2254
2255   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2256
2257   return (class->write_async == g_output_stream_real_write_async &&
2258       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
2259         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
2260 }
2261
2262 /*< internal >
2263  * g_output_stream_async_writev_is_via_threads:
2264  * @stream: a #GOutputStream.
2265  *
2266  * Checks if an output stream's writev_async function uses threads.
2267  *
2268  * Returns: %TRUE if @stream's writev_async function uses threads.
2269  **/
2270 gboolean
2271 g_output_stream_async_writev_is_via_threads (GOutputStream *stream)
2272 {
2273   GOutputStreamClass *class;
2274
2275   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2276
2277   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2278
2279   return (class->writev_async == g_output_stream_real_writev_async &&
2280       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
2281         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
2282 }
2283
2284 /*< internal >
2285  * g_output_stream_async_close_is_via_threads:
2286  * @stream: output stream
2287  *
2288  * Checks if an output stream's close_async function uses threads.
2289  *
2290  * Returns: %TRUE if @stream's close_async function uses threads.
2291  **/
2292 gboolean
2293 g_output_stream_async_close_is_via_threads (GOutputStream *stream)
2294 {
2295   GOutputStreamClass *class;
2296
2297   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
2298
2299   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2300
2301   return class->close_async == g_output_stream_real_close_async;
2302 }
2303
2304 /********************************************
2305  *   Default implementation of sync ops    *
2306  ********************************************/
2307 static gboolean
2308 g_output_stream_real_writev (GOutputStream         *stream,
2309                              const GOutputVector   *vectors,
2310                              gsize                  n_vectors,
2311                              gsize                 *bytes_written,
2312                              GCancellable          *cancellable,
2313                              GError               **error)
2314 {
2315   GOutputStreamClass *class;
2316   gsize _bytes_written = 0;
2317   gsize i;
2318   GError *err = NULL;
2319
2320   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2321
2322   if (bytes_written)
2323     *bytes_written = 0;
2324
2325   for (i = 0; i < n_vectors; i++)
2326     {
2327       gssize res = 0;
2328
2329       /* Would we overflow here? In that case simply return and let the caller
2330        * handle this like a short write */
2331       if (_bytes_written > G_MAXSIZE - vectors[i].size)
2332         break;
2333
2334       res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err);
2335
2336       if (res == -1)
2337         {
2338           /* If we already wrote something  we handle this like a short write
2339            * and assume that on the next call the same error happens again, or
2340            * everything finishes successfully without data loss then
2341            */
2342           if (_bytes_written > 0)
2343             {
2344               if (bytes_written)
2345                 *bytes_written = _bytes_written;
2346
2347               g_clear_error (&err);
2348               return TRUE;
2349             }
2350
2351           g_propagate_error (error, err);
2352           return FALSE;
2353         }
2354
2355       _bytes_written += res;
2356       /* if we had a short write break the loop here */
2357       if ((gsize) res < vectors[i].size)
2358         break;
2359     }
2360
2361   if (bytes_written)
2362     *bytes_written = _bytes_written;
2363
2364   return TRUE;
2365 }
2366
2367 /********************************************
2368  *   Default implementation of async ops    *
2369  ********************************************/
2370
2371 typedef struct {
2372   const void         *buffer;
2373   gsize               count_requested;
2374   gssize              count_written;
2375 } WriteData;
2376
2377 static void
2378 free_write_data (WriteData *op)
2379 {
2380   g_slice_free (WriteData, op);
2381 }
2382
2383 static void
2384 write_async_thread (GTask        *task,
2385                     gpointer      source_object,
2386                     gpointer      task_data,
2387                     GCancellable *cancellable)
2388 {
2389   GOutputStream *stream = source_object;
2390   WriteData *op = task_data;
2391   GOutputStreamClass *class;
2392   GError *error = NULL;
2393   gssize count_written;
2394
2395   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2396   count_written = class->write_fn (stream, op->buffer, op->count_requested,
2397                                    cancellable, &error);
2398   if (count_written == -1)
2399     g_task_return_error (task, error);
2400   else
2401     g_task_return_int (task, count_written);
2402 }
2403
2404 static void write_async_pollable (GPollableOutputStream *stream,
2405                                   GTask                 *task);
2406
2407 static gboolean
2408 write_async_pollable_ready (GPollableOutputStream *stream,
2409                             gpointer               user_data)
2410 {
2411   GTask *task = user_data;
2412
2413   write_async_pollable (stream, task);
2414   return FALSE;
2415 }
2416
2417 static void
2418 write_async_pollable (GPollableOutputStream *stream,
2419                       GTask                 *task)
2420 {
2421   GError *error = NULL;
2422   WriteData *op = g_task_get_task_data (task);
2423   gssize count_written;
2424
2425   if (g_task_return_error_if_cancelled (task))
2426     return;
2427
2428   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
2429     write_nonblocking (stream, op->buffer, op->count_requested, &error);
2430
2431   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
2432     {
2433       GSource *source;
2434
2435       g_error_free (error);
2436
2437       source = g_pollable_output_stream_create_source (stream,
2438                                                        g_task_get_cancellable (task));
2439       g_task_attach_source (task, source,
2440                             (GSourceFunc) write_async_pollable_ready);
2441       g_source_unref (source);
2442       return;
2443     }
2444
2445   if (count_written == -1)
2446     g_task_return_error (task, error);
2447   else
2448     g_task_return_int (task, count_written);
2449 }
2450
2451 static void
2452 g_output_stream_real_write_async (GOutputStream       *stream,
2453                                   const void          *buffer,
2454                                   gsize                count,
2455                                   int                  io_priority,
2456                                   GCancellable        *cancellable,
2457                                   GAsyncReadyCallback  callback,
2458                                   gpointer             user_data)
2459 {
2460   GTask *task;
2461   WriteData *op;
2462
2463   op = g_slice_new0 (WriteData);
2464   task = g_task_new (stream, cancellable, callback, user_data);
2465   g_task_set_check_cancellable (task, FALSE);
2466   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
2467   op->buffer = buffer;
2468   op->count_requested = count;
2469
2470   if (!g_output_stream_async_write_is_via_threads (stream))
2471     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
2472   else
2473     g_task_run_in_thread (task, write_async_thread);
2474   g_object_unref (task);
2475 }
2476
2477 static gssize
2478 g_output_stream_real_write_finish (GOutputStream  *stream,
2479                                    GAsyncResult   *result,
2480                                    GError        **error)
2481 {
2482   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2483
2484   return g_task_propagate_int (G_TASK (result), error);
2485 }
2486
2487 typedef struct {
2488   const GOutputVector *vectors;
2489   gsize                n_vectors; /* (unowned) */
2490   gsize                bytes_written;
2491 } WritevData;
2492
2493 static void
2494 free_writev_data (WritevData *op)
2495 {
2496   g_slice_free (WritevData, op);
2497 }
2498
2499 static void
2500 writev_async_thread (GTask        *task,
2501                      gpointer      source_object,
2502                      gpointer      task_data,
2503                      GCancellable *cancellable)
2504 {
2505   GOutputStream *stream = source_object;
2506   WritevData *op = task_data;
2507   GOutputStreamClass *class;
2508   GError *error = NULL;
2509   gboolean res;
2510
2511   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2512   res = class->writev_fn (stream, op->vectors, op->n_vectors,
2513                           &op->bytes_written, cancellable, &error);
2514
2515   g_warn_if_fail (res || op->bytes_written == 0);
2516   g_warn_if_fail (res || error != NULL);
2517
2518   if (!res)
2519     g_task_return_error (task, g_steal_pointer (&error));
2520   else
2521     g_task_return_boolean (task, TRUE);
2522 }
2523
2524 static void writev_async_pollable (GPollableOutputStream *stream,
2525                                    GTask                 *task);
2526
2527 static gboolean
2528 writev_async_pollable_ready (GPollableOutputStream *stream,
2529                              gpointer               user_data)
2530 {
2531   GTask *task = user_data;
2532
2533   writev_async_pollable (stream, task);
2534   return G_SOURCE_REMOVE;
2535 }
2536
2537 static void
2538 writev_async_pollable (GPollableOutputStream *stream,
2539                        GTask                 *task)
2540 {
2541   GError *error = NULL;
2542   WritevData *op = g_task_get_task_data (task);
2543   GPollableReturn res;
2544   gsize bytes_written = 0;
2545
2546   if (g_task_return_error_if_cancelled (task))
2547     return;
2548
2549   res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
2550     writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error);
2551
2552   switch (res)
2553     {
2554     case G_POLLABLE_RETURN_WOULD_BLOCK:
2555         {
2556           GSource *source;
2557
2558           g_warn_if_fail (error == NULL);
2559           g_warn_if_fail (bytes_written == 0);
2560
2561           source = g_pollable_output_stream_create_source (stream,
2562                                                            g_task_get_cancellable (task));
2563           g_task_attach_source (task, source,
2564                                 (GSourceFunc) writev_async_pollable_ready);
2565           g_source_unref (source);
2566         }
2567         break;
2568       case G_POLLABLE_RETURN_OK:
2569         g_warn_if_fail (error == NULL);
2570         op->bytes_written = bytes_written;
2571         g_task_return_boolean (task, TRUE);
2572         break;
2573       case G_POLLABLE_RETURN_FAILED:
2574         g_warn_if_fail (bytes_written == 0);
2575         g_warn_if_fail (error != NULL);
2576         g_task_return_error (task, g_steal_pointer (&error));
2577         break;
2578       default:
2579         g_assert_not_reached ();
2580     }
2581 }
2582
2583 static void
2584 g_output_stream_real_writev_async (GOutputStream        *stream,
2585                                    const GOutputVector  *vectors,
2586                                    gsize                 n_vectors,
2587                                    int                   io_priority,
2588                                    GCancellable         *cancellable,
2589                                    GAsyncReadyCallback   callback,
2590                                    gpointer              user_data)
2591 {
2592   GTask *task;
2593   WritevData *op;
2594   GError *error = NULL;
2595
2596   op = g_slice_new0 (WritevData);
2597   task = g_task_new (stream, cancellable, callback, user_data);
2598   op->vectors = vectors;
2599   op->n_vectors = n_vectors;
2600
2601   g_task_set_check_cancellable (task, FALSE);
2602   g_task_set_source_tag (task, g_output_stream_writev_async);
2603   g_task_set_priority (task, io_priority);
2604   g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data);
2605
2606   if (n_vectors == 0)
2607     {
2608       g_task_return_boolean (task, TRUE);
2609       g_object_unref (task);
2610       return;
2611     }
2612
2613   if (!g_output_stream_set_pending (stream, &error))
2614     {
2615       g_task_return_error (task, g_steal_pointer (&error));
2616       g_object_unref (task);
2617       return;
2618     }
2619
2620   if (!g_output_stream_async_writev_is_via_threads (stream))
2621     writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
2622   else
2623     g_task_run_in_thread (task, writev_async_thread);
2624
2625   g_object_unref (task);
2626 }
2627
2628 static gboolean
2629 g_output_stream_real_writev_finish (GOutputStream   *stream,
2630                                     GAsyncResult    *result,
2631                                     gsize           *bytes_written,
2632                                     GError         **error)
2633 {
2634   GTask *task;
2635
2636   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2637   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE);
2638
2639   g_output_stream_clear_pending (stream);
2640
2641   task = G_TASK (result);
2642
2643   if (bytes_written)
2644     {
2645       WritevData *op = g_task_get_task_data (task);
2646
2647       *bytes_written = op->bytes_written;
2648     }
2649
2650   return g_task_propagate_boolean (task, error);
2651 }
2652
2653 typedef struct {
2654   GInputStream *source;
2655   GOutputStreamSpliceFlags flags;
2656   guint istream_closed : 1;
2657   guint ostream_closed : 1;
2658   gssize n_read;
2659   gssize n_written;
2660   gsize bytes_copied;
2661   GError *error;
2662   guint8 *buffer;
2663 } SpliceData;
2664
2665 static void
2666 free_splice_data (SpliceData *op)
2667 {
2668   g_clear_pointer (&op->buffer, g_free);
2669   g_object_unref (op->source);
2670   g_clear_error (&op->error);
2671   g_free (op);
2672 }
2673
2674 static void
2675 real_splice_async_complete_cb (GTask *task)
2676 {
2677   SpliceData *op = g_task_get_task_data (task);
2678
2679   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
2680       !op->istream_closed)
2681     return;
2682
2683   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
2684       !op->ostream_closed)
2685     return;
2686
2687   if (op->error != NULL)
2688     {
2689       g_task_return_error (task, op->error);
2690       op->error = NULL;
2691     }
2692   else
2693     {
2694       g_task_return_int (task, op->bytes_copied);
2695     }
2696
2697   g_object_unref (task);
2698 }
2699
2700 static void
2701 real_splice_async_close_input_cb (GObject      *source,
2702                                   GAsyncResult *res,
2703                                   gpointer      user_data)
2704 {
2705   GTask *task = user_data;
2706   SpliceData *op = g_task_get_task_data (task);
2707
2708   g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
2709   op->istream_closed = TRUE;
2710
2711   real_splice_async_complete_cb (task);
2712 }
2713
2714 static void
2715 real_splice_async_close_output_cb (GObject      *source,
2716                                    GAsyncResult *res,
2717                                    gpointer      user_data)
2718 {
2719   GTask *task = G_TASK (user_data);
2720   SpliceData *op = g_task_get_task_data (task);
2721   GError **error = (op->error == NULL) ? &op->error : NULL;
2722
2723   g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
2724   op->ostream_closed = TRUE;
2725
2726   real_splice_async_complete_cb (task);
2727 }
2728
2729 static void
2730 real_splice_async_complete (GTask *task)
2731 {
2732   SpliceData *op = g_task_get_task_data (task);
2733   gboolean done = TRUE;
2734
2735   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
2736     {
2737       done = FALSE;
2738       g_input_stream_close_async (op->source, g_task_get_priority (task),
2739                                   g_task_get_cancellable (task),
2740                                   real_splice_async_close_input_cb, task);
2741     }
2742
2743   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
2744     {
2745       done = FALSE;
2746       g_output_stream_internal_close_async (g_task_get_source_object (task),
2747                                             g_task_get_priority (task),
2748                                             g_task_get_cancellable (task),
2749                                             real_splice_async_close_output_cb,
2750                                             task);
2751     }
2752
2753   if (done)
2754     real_splice_async_complete_cb (task);
2755 }
2756
2757 static void real_splice_async_read_cb (GObject      *source,
2758                                        GAsyncResult *res,
2759                                        gpointer      user_data);
2760
2761 static void
2762 real_splice_async_write_cb (GObject      *source,
2763                             GAsyncResult *res,
2764                             gpointer      user_data)
2765 {
2766   GOutputStreamClass *class;
2767   GTask *task = G_TASK (user_data);
2768   SpliceData *op = g_task_get_task_data (task);
2769   gssize ret;
2770
2771   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
2772
2773   ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
2774
2775   if (ret == -1)
2776     {
2777       real_splice_async_complete (task);
2778       return;
2779     }
2780
2781   op->n_written += ret;
2782   op->bytes_copied += ret;
2783   if (op->bytes_copied > G_MAXSSIZE)
2784     op->bytes_copied = G_MAXSSIZE;
2785
2786   if (op->n_written < op->n_read)
2787     {
2788       class->write_async (g_task_get_source_object (task),
2789                           op->buffer + op->n_written,
2790                           op->n_read - op->n_written,
2791                           g_task_get_priority (task),
2792                           g_task_get_cancellable (task),
2793                           real_splice_async_write_cb, task);
2794       return;
2795     }
2796
2797   g_input_stream_read_async (op->source, op->buffer, 8192,
2798                              g_task_get_priority (task),
2799                              g_task_get_cancellable (task),
2800                              real_splice_async_read_cb, task);
2801 }
2802
2803 static void
2804 real_splice_async_read_cb (GObject      *source,
2805                            GAsyncResult *res,
2806                            gpointer      user_data)
2807 {
2808   GOutputStreamClass *class;
2809   GTask *task = G_TASK (user_data);
2810   SpliceData *op = g_task_get_task_data (task);
2811   gssize ret;
2812
2813   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
2814
2815   ret = g_input_stream_read_finish (op->source, res, &op->error);
2816   if (ret == -1 || ret == 0)
2817     {
2818       real_splice_async_complete (task);
2819       return;
2820     }
2821
2822   op->n_read = ret;
2823   op->n_written = 0;
2824
2825   class->write_async (g_task_get_source_object (task), op->buffer,
2826                       op->n_read, g_task_get_priority (task),
2827                       g_task_get_cancellable (task),
2828                       real_splice_async_write_cb, task);
2829 }
2830
2831 static void
2832 splice_async_thread (GTask        *task,
2833                      gpointer      source_object,
2834                      gpointer      task_data,
2835                      GCancellable *cancellable)
2836 {
2837   GOutputStream *stream = source_object;
2838   SpliceData *op = task_data;
2839   GOutputStreamClass *class;
2840   GError *error = NULL;
2841   gssize bytes_copied;
2842
2843   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2844   
2845   bytes_copied = class->splice (stream,
2846                                 op->source,
2847                                 op->flags,
2848                                 cancellable,
2849                                 &error);
2850   if (bytes_copied == -1)
2851     g_task_return_error (task, error);
2852   else
2853     g_task_return_int (task, bytes_copied);
2854 }
2855
2856 static void
2857 g_output_stream_real_splice_async (GOutputStream             *stream,
2858                                    GInputStream              *source,
2859                                    GOutputStreamSpliceFlags   flags,
2860                                    int                        io_priority,
2861                                    GCancellable              *cancellable,
2862                                    GAsyncReadyCallback        callback,
2863                                    gpointer                   user_data)
2864 {
2865   GTask *task;
2866   SpliceData *op;
2867
2868   op = g_new0 (SpliceData, 1);
2869   task = g_task_new (stream, cancellable, callback, user_data);
2870   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
2871   op->flags = flags;
2872   op->source = g_object_ref (source);
2873
2874   if (g_input_stream_async_read_is_via_threads (source) &&
2875       g_output_stream_async_write_is_via_threads (stream))
2876     {
2877       g_task_run_in_thread (task, splice_async_thread);
2878       g_object_unref (task);
2879     }
2880   else
2881     {
2882       op->buffer = g_malloc (8192);
2883       g_input_stream_read_async (op->source, op->buffer, 8192,
2884                                  g_task_get_priority (task),
2885                                  g_task_get_cancellable (task),
2886                                  real_splice_async_read_cb, task);
2887     }
2888 }
2889
2890 static gssize
2891 g_output_stream_real_splice_finish (GOutputStream  *stream,
2892                                     GAsyncResult   *result,
2893                                     GError        **error)
2894 {
2895   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2896
2897   return g_task_propagate_int (G_TASK (result), error);
2898 }
2899
2900
2901 static void
2902 flush_async_thread (GTask        *task,
2903                     gpointer      source_object,
2904                     gpointer      task_data,
2905                     GCancellable *cancellable)
2906 {
2907   GOutputStream *stream = source_object;
2908   GOutputStreamClass *class;
2909   gboolean result;
2910   GError *error = NULL;
2911
2912   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2913   result = TRUE;
2914   if (class->flush)
2915     result = class->flush (stream, cancellable, &error);
2916
2917   if (result)
2918     g_task_return_boolean (task, TRUE);
2919   else
2920     g_task_return_error (task, error);
2921 }
2922
2923 static void
2924 g_output_stream_real_flush_async (GOutputStream       *stream,
2925                                   int                  io_priority,
2926                                   GCancellable        *cancellable,
2927                                   GAsyncReadyCallback  callback,
2928                                   gpointer             user_data)
2929 {
2930   GTask *task;
2931
2932   task = g_task_new (stream, cancellable, callback, user_data);
2933   g_task_set_priority (task, io_priority);
2934   g_task_run_in_thread (task, flush_async_thread);
2935   g_object_unref (task);
2936 }
2937
2938 static gboolean
2939 g_output_stream_real_flush_finish (GOutputStream  *stream,
2940                                    GAsyncResult   *result,
2941                                    GError        **error)
2942 {
2943   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2944
2945   return g_task_propagate_boolean (G_TASK (result), error);
2946 }
2947
2948 static void
2949 close_async_thread (GTask        *task,
2950                     gpointer      source_object,
2951                     gpointer      task_data,
2952                     GCancellable *cancellable)
2953 {
2954   GOutputStream *stream = source_object;
2955   GOutputStreamClass *class;
2956   GError *error = NULL;
2957   gboolean result = TRUE;
2958
2959   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2960
2961   /* Do a flush here if there is a flush function, and we did not have to do
2962    * an async flush before (see g_output_stream_close_async)
2963    */
2964   if (class->flush != NULL &&
2965       (class->flush_async == NULL ||
2966        class->flush_async == g_output_stream_real_flush_async))
2967     {
2968       result = class->flush (stream, cancellable, &error);
2969     }
2970
2971   /* Auto handling of cancellation disabled, and ignore
2972      cancellation, since we want to close things anyway, although
2973      possibly in a quick-n-dirty way. At least we never want to leak
2974      open handles */
2975
2976   if (class->close_fn)
2977     {
2978       /* Make sure to close, even if the flush failed (see sync close) */
2979       if (!result)
2980         class->close_fn (stream, cancellable, NULL);
2981       else
2982         result = class->close_fn (stream, cancellable, &error);
2983     }
2984
2985   if (result)
2986     g_task_return_boolean (task, TRUE);
2987   else
2988     g_task_return_error (task, error);
2989 }
2990
2991 static void
2992 g_output_stream_real_close_async (GOutputStream       *stream,
2993                                   int                  io_priority,
2994                                   GCancellable        *cancellable,
2995                                   GAsyncReadyCallback  callback,
2996                                   gpointer             user_data)
2997 {
2998   GTask *task;
2999
3000   task = g_task_new (stream, cancellable, callback, user_data);
3001   g_task_set_priority (task, io_priority);
3002   g_task_run_in_thread (task, close_async_thread);
3003   g_object_unref (task);
3004 }
3005
3006 static gboolean
3007 g_output_stream_real_close_finish (GOutputStream  *stream,
3008                                    GAsyncResult   *result,
3009                                    GError        **error)
3010 {
3011   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
3012
3013   return g_task_propagate_boolean (G_TASK (result), error);
3014 }