Add g_output_stream_write_all_async()
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <string.h>
23 #include "goutputstream.h"
24 #include "gcancellable.h"
25 #include "gasyncresult.h"
26 #include "gtask.h"
27 #include "ginputstream.h"
28 #include "gioerror.h"
29 #include "gioprivate.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GOutputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   guint closing : 1;
52   GAsyncReadyCallback outstanding_callback;
53 };
54
55 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
102                                                       int                      io_priority,
103                                                       GCancellable            *cancellable,
104                                                       GAsyncReadyCallback      callback,
105                                                       gpointer                 data);
106 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
107                                                        GAsyncResult           *result,
108                                                        GError                **error);
109
110 static void
111 g_output_stream_dispose (GObject *object)
112 {
113   GOutputStream *stream;
114
115   stream = G_OUTPUT_STREAM (object);
116   
117   if (!stream->priv->closed)
118     g_output_stream_close (stream, NULL, NULL);
119
120   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
121 }
122
123 static void
124 g_output_stream_class_init (GOutputStreamClass *klass)
125 {
126   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
127
128   gobject_class->dispose = g_output_stream_dispose;
129
130   klass->splice = g_output_stream_real_splice;
131   
132   klass->write_async = g_output_stream_real_write_async;
133   klass->write_finish = g_output_stream_real_write_finish;
134   klass->splice_async = g_output_stream_real_splice_async;
135   klass->splice_finish = g_output_stream_real_splice_finish;
136   klass->flush_async = g_output_stream_real_flush_async;
137   klass->flush_finish = g_output_stream_real_flush_finish;
138   klass->close_async = g_output_stream_real_close_async;
139   klass->close_finish = g_output_stream_real_close_finish;
140 }
141
142 static void
143 g_output_stream_init (GOutputStream *stream)
144 {
145   stream->priv = g_output_stream_get_instance_private (stream);
146 }
147
148 /**
149  * g_output_stream_write:
150  * @stream: a #GOutputStream.
151  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
152  * @count: the number of bytes to write
153  * @cancellable: (allow-none): optional cancellable object
154  * @error: location to store the error occurring, or %NULL to ignore
155  *
156  * Tries to write @count bytes from @buffer into the stream. Will block
157  * during the operation.
158  * 
159  * If count is 0, returns 0 and does nothing. A value of @count
160  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
161  *
162  * On success, the number of bytes written to the stream is returned.
163  * It is not an error if this is not the same as the requested size, as it
164  * can happen e.g. on a partial I/O error, or if there is not enough
165  * storage in the stream. All writes block until at least one byte
166  * is written or an error occurs; 0 is never returned (unless
167  * @count is 0).
168  * 
169  * If @cancellable is not %NULL, then the operation can be cancelled by
170  * triggering the cancellable object from another thread. If the operation
171  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
172  * operation was partially finished when the operation was cancelled the
173  * partial result will be returned, without an error.
174  *
175  * On error -1 is returned and @error is set accordingly.
176  * 
177  * Virtual: write_fn
178  *
179  * Returns: Number of bytes written, or -1 on error
180  **/
181 gssize
182 g_output_stream_write (GOutputStream  *stream,
183                        const void     *buffer,
184                        gsize           count,
185                        GCancellable   *cancellable,
186                        GError        **error)
187 {
188   GOutputStreamClass *class;
189   gssize res;
190
191   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
192   g_return_val_if_fail (buffer != NULL, 0);
193
194   if (count == 0)
195     return 0;
196   
197   if (((gssize) count) < 0)
198     {
199       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
200                    _("Too large count value passed to %s"), G_STRFUNC);
201       return -1;
202     }
203
204   class = G_OUTPUT_STREAM_GET_CLASS (stream);
205
206   if (class->write_fn == NULL) 
207     {
208       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
209                            _("Output stream doesn't implement write"));
210       return -1;
211     }
212   
213   if (!g_output_stream_set_pending (stream, error))
214     return -1;
215   
216   if (cancellable)
217     g_cancellable_push_current (cancellable);
218   
219   res = class->write_fn (stream, buffer, count, cancellable, error);
220   
221   if (cancellable)
222     g_cancellable_pop_current (cancellable);
223   
224   g_output_stream_clear_pending (stream);
225
226   return res; 
227 }
228
229 /**
230  * g_output_stream_write_all:
231  * @stream: a #GOutputStream.
232  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
233  * @count: the number of bytes to write
234  * @bytes_written: (out): location to store the number of bytes that was 
235  *     written to the stream
236  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
237  * @error: location to store the error occurring, or %NULL to ignore
238  *
239  * Tries to write @count bytes from @buffer into the stream. Will block
240  * during the operation.
241  * 
242  * This function is similar to g_output_stream_write(), except it tries to
243  * write as many bytes as requested, only stopping on an error.
244  *
245  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
246  * is set to @count.
247  * 
248  * If there is an error during the operation %FALSE is returned and @error
249  * is set to indicate the error status, @bytes_written is updated to contain
250  * the number of bytes written into the stream before the error occurred.
251  *
252  * Returns: %TRUE on success, %FALSE if there was an error
253  **/
254 gboolean
255 g_output_stream_write_all (GOutputStream  *stream,
256                            const void     *buffer,
257                            gsize           count,
258                            gsize          *bytes_written,
259                            GCancellable   *cancellable,
260                            GError        **error)
261 {
262   gsize _bytes_written;
263   gssize res;
264
265   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
266   g_return_val_if_fail (buffer != NULL, FALSE);
267
268   _bytes_written = 0;
269   while (_bytes_written < count)
270     {
271       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
272                                    cancellable, error);
273       if (res == -1)
274         {
275           if (bytes_written)
276             *bytes_written = _bytes_written;
277           return FALSE;
278         }
279       
280       if (res == 0)
281         g_warning ("Write returned zero without error");
282
283       _bytes_written += res;
284     }
285   
286   if (bytes_written)
287     *bytes_written = _bytes_written;
288
289   return TRUE;
290 }
291
292 /**
293  * g_output_stream_printf:
294  * @stream: a #GOutputStream.
295  * @bytes_written: (out): location to store the number of bytes that was
296  *     written to the stream
297  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
298  * @error: location to store the error occurring, or %NULL to ignore
299  * @format: the format string. See the printf() documentation
300  * @...: the parameters to insert into the format string
301  *
302  * This is a utility function around g_output_stream_write_all(). It
303  * uses g_strdup_vprintf() to turn @format and @... into a string that
304  * is then written to @stream.
305  *
306  * See the documentation of g_output_stream_write_all() about the
307  * behavior of the actual write operation.
308  *
309  * Note that partial writes cannot be properly checked with this
310  * function due to the variable length of the written string, if you
311  * need precise control over partial write failures, you need to
312  * create you own printf()-like wrapper around g_output_stream_write()
313  * or g_output_stream_write_all().
314  *
315  * Since: 2.40
316  *
317  * Returns: %TRUE on success, %FALSE if there was an error
318  **/
319 gboolean
320 g_output_stream_printf (GOutputStream  *stream,
321                         gsize          *bytes_written,
322                         GCancellable   *cancellable,
323                         GError        **error,
324                         const gchar    *format,
325                         ...)
326 {
327   va_list  args;
328   gboolean success;
329
330   va_start (args, format);
331   success = g_output_stream_vprintf (stream, bytes_written, cancellable,
332                                      error, format, args);
333   va_end (args);
334
335   return success;
336 }
337
338 /**
339  * g_output_stream_vprintf:
340  * @stream: a #GOutputStream.
341  * @bytes_written: (out): location to store the number of bytes that was
342  *     written to the stream
343  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
344  * @error: location to store the error occurring, or %NULL to ignore
345  * @format: the format string. See the printf() documentation
346  * @args: the parameters to insert into the format string
347  *
348  * This is a utility function around g_output_stream_write_all(). It
349  * uses g_strdup_vprintf() to turn @format and @args into a string that
350  * is then written to @stream.
351  *
352  * See the documentation of g_output_stream_write_all() about the
353  * behavior of the actual write operation.
354  *
355  * Note that partial writes cannot be properly checked with this
356  * function due to the variable length of the written string, if you
357  * need precise control over partial write failures, you need to
358  * create you own printf()-like wrapper around g_output_stream_write()
359  * or g_output_stream_write_all().
360  *
361  * Since: 2.40
362  *
363  * Returns: %TRUE on success, %FALSE if there was an error
364  **/
365 gboolean
366 g_output_stream_vprintf (GOutputStream  *stream,
367                          gsize          *bytes_written,
368                          GCancellable   *cancellable,
369                          GError        **error,
370                          const gchar    *format,
371                          va_list         args)
372 {
373   gchar    *text;
374   gboolean  success;
375
376   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
377   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (stream), FALSE);
378   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
379   g_return_val_if_fail (format != NULL, FALSE);
380
381   text = g_strdup_vprintf (format, args);
382   success = g_output_stream_write_all (stream,
383                                        text, strlen (text),
384                                        bytes_written, cancellable, error);
385   g_free (text);
386
387   return success;
388 }
389
390 /**
391  * g_output_stream_write_bytes:
392  * @stream: a #GOutputStream.
393  * @bytes: the #GBytes to write
394  * @cancellable: (allow-none): optional cancellable object
395  * @error: location to store the error occurring, or %NULL to ignore
396  *
397  * A wrapper function for g_output_stream_write() which takes a
398  * #GBytes as input.  This can be more convenient for use by language
399  * bindings or in other cases where the refcounted nature of #GBytes
400  * is helpful over a bare pointer interface.
401  *
402  * However, note that this function may still perform partial writes,
403  * just like g_output_stream_write().  If that occurs, to continue
404  * writing, you will need to create a new #GBytes containing just the
405  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
406  * #GBytes instance multiple times potentially can result in duplicated
407  * data in the output stream.
408  *
409  * Returns: Number of bytes written, or -1 on error
410  **/
411 gssize
412 g_output_stream_write_bytes (GOutputStream  *stream,
413                              GBytes         *bytes,
414                              GCancellable   *cancellable,
415                              GError        **error)
416 {
417   gsize size;
418   gconstpointer data;
419
420   data = g_bytes_get_data (bytes, &size);
421
422   return g_output_stream_write (stream,
423                                 data, size,
424                                 cancellable,
425                                 error);
426 }
427
428 /**
429  * g_output_stream_flush:
430  * @stream: a #GOutputStream.
431  * @cancellable: (allow-none): optional cancellable object
432  * @error: location to store the error occurring, or %NULL to ignore
433  *
434  * Forces a write of all user-space buffered data for the given
435  * @stream. Will block during the operation. Closing the stream will
436  * implicitly cause a flush.
437  *
438  * This function is optional for inherited classes.
439  * 
440  * If @cancellable is not %NULL, then the operation can be cancelled by
441  * triggering the cancellable object from another thread. If the operation
442  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
443  *
444  * Returns: %TRUE on success, %FALSE on error
445  **/
446 gboolean
447 g_output_stream_flush (GOutputStream  *stream,
448                        GCancellable   *cancellable,
449                        GError        **error)
450 {
451   GOutputStreamClass *class;
452   gboolean res;
453
454   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
455
456   if (!g_output_stream_set_pending (stream, error))
457     return FALSE;
458   
459   class = G_OUTPUT_STREAM_GET_CLASS (stream);
460
461   res = TRUE;
462   if (class->flush)
463     {
464       if (cancellable)
465         g_cancellable_push_current (cancellable);
466       
467       res = class->flush (stream, cancellable, error);
468       
469       if (cancellable)
470         g_cancellable_pop_current (cancellable);
471     }
472   
473   g_output_stream_clear_pending (stream);
474
475   return res;
476 }
477
478 /**
479  * g_output_stream_splice:
480  * @stream: a #GOutputStream.
481  * @source: a #GInputStream.
482  * @flags: a set of #GOutputStreamSpliceFlags.
483  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
484  * @error: a #GError location to store the error occurring, or %NULL to
485  * ignore.
486  *
487  * Splices an input stream into an output stream.
488  *
489  * Returns: a #gssize containing the size of the data spliced, or
490  *     -1 if an error occurred. Note that if the number of bytes
491  *     spliced is greater than %G_MAXSSIZE, then that will be
492  *     returned, and there is no way to determine the actual number
493  *     of bytes spliced.
494  **/
495 gssize
496 g_output_stream_splice (GOutputStream             *stream,
497                         GInputStream              *source,
498                         GOutputStreamSpliceFlags   flags,
499                         GCancellable              *cancellable,
500                         GError                   **error)
501 {
502   GOutputStreamClass *class;
503   gssize bytes_copied;
504
505   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
506   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
507
508   if (g_input_stream_is_closed (source))
509     {
510       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
511                            _("Source stream is already closed"));
512       return -1;
513     }
514
515   if (!g_output_stream_set_pending (stream, error))
516     return -1;
517
518   class = G_OUTPUT_STREAM_GET_CLASS (stream);
519
520   if (cancellable)
521     g_cancellable_push_current (cancellable);
522
523   bytes_copied = class->splice (stream, source, flags, cancellable, error);
524
525   if (cancellable)
526     g_cancellable_pop_current (cancellable);
527
528   g_output_stream_clear_pending (stream);
529
530   return bytes_copied;
531 }
532
533 static gssize
534 g_output_stream_real_splice (GOutputStream             *stream,
535                              GInputStream              *source,
536                              GOutputStreamSpliceFlags   flags,
537                              GCancellable              *cancellable,
538                              GError                   **error)
539 {
540   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
541   gssize n_read, n_written;
542   gsize bytes_copied;
543   char buffer[8192], *p;
544   gboolean res;
545
546   bytes_copied = 0;
547   if (class->write_fn == NULL)
548     {
549       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
550                            _("Output stream doesn't implement write"));
551       res = FALSE;
552       goto notsupported;
553     }
554
555   res = TRUE;
556   do
557     {
558       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
559       if (n_read == -1)
560         {
561           res = FALSE;
562           break;
563         }
564
565       if (n_read == 0)
566         break;
567
568       p = buffer;
569       while (n_read > 0)
570         {
571           n_written = class->write_fn (stream, p, n_read, cancellable, error);
572           if (n_written == -1)
573             {
574               res = FALSE;
575               break;
576             }
577
578           p += n_written;
579           n_read -= n_written;
580           bytes_copied += n_written;
581         }
582
583       if (bytes_copied > G_MAXSSIZE)
584         bytes_copied = G_MAXSSIZE;
585     }
586   while (res);
587
588  notsupported:
589   if (!res)
590     error = NULL; /* Ignore further errors */
591
592   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
593     {
594       /* Don't care about errors in source here */
595       g_input_stream_close (source, cancellable, NULL);
596     }
597
598   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
599     {
600       /* But write errors on close are bad! */
601       res = g_output_stream_internal_close (stream, cancellable, error);
602     }
603
604   if (res)
605     return bytes_copied;
606
607   return -1;
608 }
609
610 /* Must always be called inside
611  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
612 static gboolean
613 g_output_stream_internal_close (GOutputStream  *stream,
614                                 GCancellable   *cancellable,
615                                 GError        **error)
616 {
617   GOutputStreamClass *class;
618   gboolean res;
619
620   if (stream->priv->closed)
621     return TRUE;
622
623   class = G_OUTPUT_STREAM_GET_CLASS (stream);
624
625   stream->priv->closing = TRUE;
626
627   if (cancellable)
628     g_cancellable_push_current (cancellable);
629
630   if (class->flush)
631     res = class->flush (stream, cancellable, error);
632   else
633     res = TRUE;
634
635   if (!res)
636     {
637       /* flushing caused the error that we want to return,
638        * but we still want to close the underlying stream if possible
639        */
640       if (class->close_fn)
641         class->close_fn (stream, cancellable, NULL);
642     }
643   else
644     {
645       res = TRUE;
646       if (class->close_fn)
647         res = class->close_fn (stream, cancellable, error);
648     }
649
650   if (cancellable)
651     g_cancellable_pop_current (cancellable);
652
653   stream->priv->closing = FALSE;
654   stream->priv->closed = TRUE;
655
656   return res;
657 }
658
659 /**
660  * g_output_stream_close:
661  * @stream: A #GOutputStream.
662  * @cancellable: (allow-none): optional cancellable object
663  * @error: location to store the error occurring, or %NULL to ignore
664  *
665  * Closes the stream, releasing resources related to it.
666  *
667  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
668  * Closing a stream multiple times will not return an error.
669  *
670  * Closing a stream will automatically flush any outstanding buffers in the
671  * stream.
672  *
673  * Streams will be automatically closed when the last reference
674  * is dropped, but you might want to call this function to make sure 
675  * resources are released as early as possible.
676  *
677  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
678  * open after the stream is closed. See the documentation for the individual
679  * stream for details.
680  *
681  * On failure the first error that happened will be reported, but the close
682  * operation will finish as much as possible. A stream that failed to
683  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
684  * is important to check and report the error to the user, otherwise
685  * there might be a loss of data as all data might not be written.
686  * 
687  * If @cancellable is not %NULL, then the operation can be cancelled by
688  * triggering the cancellable object from another thread. If the operation
689  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
690  * Cancelling a close will still leave the stream closed, but there some streams
691  * can use a faster close that doesn't block to e.g. check errors. On
692  * cancellation (as with any error) there is no guarantee that all written
693  * data will reach the target. 
694  *
695  * Returns: %TRUE on success, %FALSE on failure
696  **/
697 gboolean
698 g_output_stream_close (GOutputStream  *stream,
699                        GCancellable   *cancellable,
700                        GError        **error)
701 {
702   gboolean res;
703
704   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
705
706   if (stream->priv->closed)
707     return TRUE;
708
709   if (!g_output_stream_set_pending (stream, error))
710     return FALSE;
711
712   res = g_output_stream_internal_close (stream, cancellable, error);
713
714   g_output_stream_clear_pending (stream);
715   
716   return res;
717 }
718
719 static void
720 async_ready_write_callback_wrapper (GObject      *source_object,
721                                     GAsyncResult *res,
722                                     gpointer      user_data)
723 {
724   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
725   GOutputStreamClass *class;
726   GTask *task = user_data;
727   gssize nwrote;
728   GError *error = NULL;
729
730   g_output_stream_clear_pending (stream);
731   
732   if (g_async_result_legacy_propagate_error (res, &error))
733     nwrote = -1;
734   else
735     {
736       class = G_OUTPUT_STREAM_GET_CLASS (stream);
737       nwrote = class->write_finish (stream, res, &error);
738     }
739
740   if (nwrote >= 0)
741     g_task_return_int (task, nwrote);
742   else
743     g_task_return_error (task, error);
744   g_object_unref (task);
745 }
746
747 /**
748  * g_output_stream_write_async:
749  * @stream: A #GOutputStream.
750  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
751  * @count: the number of bytes to write
752  * @io_priority: the io priority of the request.
753  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
754  * @callback: (scope async): callback to call when the request is satisfied
755  * @user_data: (closure): the data to pass to callback function
756  *
757  * Request an asynchronous write of @count bytes from @buffer into 
758  * the stream. When the operation is finished @callback will be called.
759  * You can then call g_output_stream_write_finish() to get the result of the 
760  * operation.
761  *
762  * During an async request no other sync and async calls are allowed, 
763  * and will result in %G_IO_ERROR_PENDING errors. 
764  *
765  * A value of @count larger than %G_MAXSSIZE will cause a 
766  * %G_IO_ERROR_INVALID_ARGUMENT error.
767  *
768  * On success, the number of bytes written will be passed to the
769  * @callback. It is not an error if this is not the same as the 
770  * requested size, as it can happen e.g. on a partial I/O error, 
771  * but generally we try to write as many bytes as requested. 
772  *
773  * You are guaranteed that this method will never fail with
774  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
775  * method will just wait until this changes.
776  *
777  * Any outstanding I/O request with higher priority (lower numerical 
778  * value) will be executed before an outstanding request with lower 
779  * priority. Default priority is %G_PRIORITY_DEFAULT.
780  *
781  * The asyncronous methods have a default fallback that uses threads 
782  * to implement asynchronicity, so they are optional for inheriting 
783  * classes. However, if you override one you must override all.
784  *
785  * For the synchronous, blocking version of this function, see 
786  * g_output_stream_write().
787  *
788  * Note that no copy of @buffer will be made, so it must stay valid
789  * until @callback is called. See g_output_stream_write_bytes_async()
790  * for a #GBytes version that will automatically hold a reference to
791  * the contents (without copying) for the duration of the call.
792  */
793 void
794 g_output_stream_write_async (GOutputStream       *stream,
795                              const void          *buffer,
796                              gsize                count,
797                              int                  io_priority,
798                              GCancellable        *cancellable,
799                              GAsyncReadyCallback  callback,
800                              gpointer             user_data)
801 {
802   GOutputStreamClass *class;
803   GError *error = NULL;
804   GTask *task;
805
806   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
807   g_return_if_fail (buffer != NULL);
808
809   task = g_task_new (stream, cancellable, callback, user_data);
810   g_task_set_source_tag (task, g_output_stream_write_async);
811   g_task_set_priority (task, io_priority);
812
813   if (count == 0)
814     {
815       g_task_return_int (task, 0);
816       g_object_unref (task);
817       return;
818     }
819
820   if (((gssize) count) < 0)
821     {
822       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
823                                _("Too large count value passed to %s"),
824                                G_STRFUNC);
825       g_object_unref (task);
826       return;
827     }
828
829   if (!g_output_stream_set_pending (stream, &error))
830     {
831       g_task_return_error (task, error);
832       g_object_unref (task);
833       return;
834     }
835   
836   class = G_OUTPUT_STREAM_GET_CLASS (stream);
837
838   class->write_async (stream, buffer, count, io_priority, cancellable,
839                       async_ready_write_callback_wrapper, task);
840 }
841
842 /**
843  * g_output_stream_write_finish:
844  * @stream: a #GOutputStream.
845  * @result: a #GAsyncResult.
846  * @error: a #GError location to store the error occurring, or %NULL to 
847  * ignore.
848  * 
849  * Finishes a stream write operation.
850  * 
851  * Returns: a #gssize containing the number of bytes written to the stream.
852  **/
853 gssize
854 g_output_stream_write_finish (GOutputStream  *stream,
855                               GAsyncResult   *result,
856                               GError        **error)
857 {
858   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
859   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
860   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
861
862   /* @result is always the GTask created by g_output_stream_write_async();
863    * we called class->write_finish() from async_ready_write_callback_wrapper.
864    */
865   return g_task_propagate_int (G_TASK (result), error);
866 }
867
868 typedef struct
869 {
870   const guint8 *buffer;
871   gsize to_write;
872   gsize bytes_written;
873 } AsyncWriteAll;
874
875 static void
876 free_async_write_all (gpointer data)
877 {
878   g_slice_free (AsyncWriteAll, data);
879 }
880
881 static void
882 write_all_callback (GObject      *stream,
883                     GAsyncResult *result,
884                     gpointer      user_data)
885 {
886   GTask *task = user_data;
887   AsyncWriteAll *data = g_task_get_task_data (task);
888
889   if (result)
890     {
891       GError *error = NULL;
892       gssize nwritten;
893
894       nwritten = g_output_stream_write_finish (G_OUTPUT_STREAM (stream), result, &error);
895
896       if (nwritten == -1)
897         {
898           g_task_return_error (task, error);
899           g_object_unref (task);
900           return;
901         }
902
903       g_assert_cmpint (nwritten, <=, data->to_write);
904       g_warn_if_fail (nwritten > 0);
905
906       data->to_write -= nwritten;
907       data->bytes_written += nwritten;
908     }
909
910   if (data->to_write == 0)
911     {
912       g_task_return_boolean (task, TRUE);
913       g_object_unref (task);
914     }
915
916   else
917     g_output_stream_write_async (G_OUTPUT_STREAM (stream),
918                                  data->buffer + data->bytes_written,
919                                  data->to_write,
920                                  g_task_get_priority (task),
921                                  g_task_get_cancellable (task),
922                                  write_all_callback, task);
923 }
924
925 static void
926 write_all_async_thread (GTask        *task,
927                         gpointer      source_object,
928                         gpointer      task_data,
929                         GCancellable *cancellable)
930 {
931   GOutputStream *stream = source_object;
932   AsyncWriteAll *data = task_data;
933   GError *error = NULL;
934
935   if (g_output_stream_write_all (stream, data->buffer, data->to_write, &data->bytes_written,
936                                  g_task_get_cancellable (task), &error))
937     g_task_return_boolean (task, TRUE);
938   else
939     g_task_return_error (task, error);
940 }
941
942 /**
943  * g_output_stream_write_all_async:
944  * @stream: A #GOutputStream
945  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write
946  * @count: the number of bytes to write
947  * @io_priority: the io priority of the request
948  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore
949  * @callback: (scope async): callback to call when the request is satisfied
950  * @user_data: (closure): the data to pass to callback function
951  *
952  * Request an asynchronous write of @count bytes from @buffer into
953  * the stream. When the operation is finished @callback will be called.
954  * You can then call g_output_stream_write_all_finish() to get the result of the
955  * operation.
956  *
957  * This is the asynchronous version of g_output_stream_write_all().
958  *
959  * Call g_output_stream_write_all_finish() to collect the result.
960  *
961  * Any outstanding I/O request with higher priority (lower numerical
962  * value) will be executed before an outstanding request with lower
963  * priority. Default priority is %G_PRIORITY_DEFAULT.
964  *
965  * Note that no copy of @buffer will be made, so it must stay valid
966  * until @callback is called.
967  *
968  * Since: 2.44
969  */
970 void
971 g_output_stream_write_all_async (GOutputStream       *stream,
972                                  const void          *buffer,
973                                  gsize                count,
974                                  int                  io_priority,
975                                  GCancellable        *cancellable,
976                                  GAsyncReadyCallback  callback,
977                                  gpointer             user_data)
978 {
979   AsyncWriteAll *data;
980   GTask *task;
981
982   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
983   g_return_if_fail (buffer != NULL || count == 0);
984
985   task = g_task_new (stream, cancellable, callback, user_data);
986   data = g_slice_new0 (AsyncWriteAll);
987   data->buffer = buffer;
988   data->to_write = count;
989
990   g_task_set_task_data (task, data, free_async_write_all);
991   g_task_set_priority (task, io_priority);
992
993   /* If async writes are going to be handled via the threadpool anyway
994    * then we may as well do it with a single dispatch instead of
995    * bouncing in and out.
996    */
997   if (g_output_stream_async_write_is_via_threads (stream))
998     {
999       g_task_run_in_thread (task, write_all_async_thread);
1000       g_object_unref (task);
1001     }
1002   else
1003     write_all_callback (G_OBJECT (stream), NULL, task);
1004 }
1005
1006 /**
1007  * g_output_stream_write_all_finish:
1008  * @stream: a #GOutputStream
1009  * @result: a #GAsyncResult
1010  * @bytes_written: (out): location to store the number of bytes that was written to the stream
1011  * @error: a #GError location to store the error occurring, or %NULL to ignore.
1012  *
1013  * Finishes an asynchronous stream write operation started with
1014  * g_output_stream_write_all_async().
1015  *
1016  * As a special exception to the normal conventions for functions that
1017  * use #GError, if this function returns %FALSE (and sets @error) then
1018  * @bytes_written will be set to the number of bytes that were
1019  * successfully written before the error was encountered.  This
1020  * functionality is only available from C.  If you need it from another
1021  * language then you must write your own loop around
1022  * g_output_stream_write_async().
1023  *
1024  * Returns: %TRUE on success, %FALSE if there was an error
1025  *
1026  * Since: 2.44
1027  **/
1028 gboolean
1029 g_output_stream_write_all_finish (GOutputStream  *stream,
1030                                   GAsyncResult   *result,
1031                                   gsize          *bytes_written,
1032                                   GError        **error)
1033 {
1034   GTask *task;
1035
1036   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1037   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1038
1039   task = G_TASK (result);
1040
1041   if (bytes_written)
1042     {
1043       AsyncWriteAll *data = (AsyncWriteAll *)g_task_get_task_data (task);
1044
1045       *bytes_written = data->bytes_written;
1046     }
1047
1048   return g_task_propagate_boolean (task, error);
1049 }
1050
1051 static void
1052 write_bytes_callback (GObject      *stream,
1053                       GAsyncResult *result,
1054                       gpointer      user_data)
1055 {
1056   GTask *task = user_data;
1057   GError *error = NULL;
1058   gssize nwrote;
1059
1060   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
1061                                          result, &error);
1062   if (nwrote == -1)
1063     g_task_return_error (task, error);
1064   else
1065     g_task_return_int (task, nwrote);
1066   g_object_unref (task);
1067 }
1068
1069 /**
1070  * g_output_stream_write_bytes_async:
1071  * @stream: A #GOutputStream.
1072  * @bytes: The bytes to write
1073  * @io_priority: the io priority of the request.
1074  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1075  * @callback: (scope async): callback to call when the request is satisfied
1076  * @user_data: (closure): the data to pass to callback function
1077  *
1078  * This function is similar to g_output_stream_write_async(), but
1079  * takes a #GBytes as input.  Due to the refcounted nature of #GBytes,
1080  * this allows the stream to avoid taking a copy of the data.
1081  *
1082  * However, note that this function may still perform partial writes,
1083  * just like g_output_stream_write_async(). If that occurs, to continue
1084  * writing, you will need to create a new #GBytes containing just the
1085  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
1086  * #GBytes instance multiple times potentially can result in duplicated
1087  * data in the output stream.
1088  *
1089  * For the synchronous, blocking version of this function, see
1090  * g_output_stream_write_bytes().
1091  **/
1092 void
1093 g_output_stream_write_bytes_async (GOutputStream       *stream,
1094                                    GBytes              *bytes,
1095                                    int                  io_priority,
1096                                    GCancellable        *cancellable,
1097                                    GAsyncReadyCallback  callback,
1098                                    gpointer             user_data)
1099 {
1100   GTask *task;
1101   gsize size;
1102   gconstpointer data;
1103
1104   data = g_bytes_get_data (bytes, &size);
1105
1106   task = g_task_new (stream, cancellable, callback, user_data);
1107   g_task_set_task_data (task, g_bytes_ref (bytes),
1108                         (GDestroyNotify) g_bytes_unref);
1109
1110   g_output_stream_write_async (stream,
1111                                data, size,
1112                                io_priority,
1113                                cancellable,
1114                                write_bytes_callback,
1115                                task);
1116 }
1117
1118 /**
1119  * g_output_stream_write_bytes_finish:
1120  * @stream: a #GOutputStream.
1121  * @result: a #GAsyncResult.
1122  * @error: a #GError location to store the error occurring, or %NULL to
1123  * ignore.
1124  *
1125  * Finishes a stream write-from-#GBytes operation.
1126  *
1127  * Returns: a #gssize containing the number of bytes written to the stream.
1128  **/
1129 gssize
1130 g_output_stream_write_bytes_finish (GOutputStream  *stream,
1131                                     GAsyncResult   *result,
1132                                     GError        **error)
1133 {
1134   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
1135   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
1136
1137   return g_task_propagate_int (G_TASK (result), error);
1138 }
1139
1140 static void
1141 async_ready_splice_callback_wrapper (GObject      *source_object,
1142                                      GAsyncResult *res,
1143                                      gpointer     _data)
1144 {
1145   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1146   GOutputStreamClass *class;
1147   GTask *task = _data;
1148   gssize nspliced;
1149   GError *error = NULL;
1150
1151   g_output_stream_clear_pending (stream);
1152   
1153   if (g_async_result_legacy_propagate_error (res, &error))
1154     nspliced = -1;
1155   else
1156     {
1157       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1158       nspliced = class->splice_finish (stream, res, &error);
1159     }
1160
1161   if (nspliced >= 0)
1162     g_task_return_int (task, nspliced);
1163   else
1164     g_task_return_error (task, error);
1165   g_object_unref (task);
1166 }
1167
1168 /**
1169  * g_output_stream_splice_async:
1170  * @stream: a #GOutputStream.
1171  * @source: a #GInputStream. 
1172  * @flags: a set of #GOutputStreamSpliceFlags.
1173  * @io_priority: the io priority of the request.
1174  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
1175  * @callback: (scope async): a #GAsyncReadyCallback. 
1176  * @user_data: (closure): user data passed to @callback.
1177  * 
1178  * Splices a stream asynchronously.
1179  * When the operation is finished @callback will be called.
1180  * You can then call g_output_stream_splice_finish() to get the 
1181  * result of the operation.
1182  *
1183  * For the synchronous, blocking version of this function, see 
1184  * g_output_stream_splice().
1185  **/
1186 void
1187 g_output_stream_splice_async (GOutputStream            *stream,
1188                               GInputStream             *source,
1189                               GOutputStreamSpliceFlags  flags,
1190                               int                       io_priority,
1191                               GCancellable             *cancellable,
1192                               GAsyncReadyCallback       callback,
1193                               gpointer                  user_data)
1194 {
1195   GOutputStreamClass *class;
1196   GTask *task;
1197   GError *error = NULL;
1198
1199   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1200   g_return_if_fail (G_IS_INPUT_STREAM (source));
1201
1202   task = g_task_new (stream, cancellable, callback, user_data);
1203   g_task_set_source_tag (task, g_output_stream_splice_async);
1204   g_task_set_priority (task, io_priority);
1205   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
1206
1207   if (g_input_stream_is_closed (source))
1208     {
1209       g_task_return_new_error (task,
1210                                G_IO_ERROR, G_IO_ERROR_CLOSED,
1211                                _("Source stream is already closed"));
1212       g_object_unref (task);
1213       return;
1214     }
1215   
1216   if (!g_output_stream_set_pending (stream, &error))
1217     {
1218       g_task_return_error (task, error);
1219       g_object_unref (task);
1220       return;
1221     }
1222
1223   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1224
1225   class->splice_async (stream, source, flags, io_priority, cancellable,
1226                        async_ready_splice_callback_wrapper, task);
1227 }
1228
1229 /**
1230  * g_output_stream_splice_finish:
1231  * @stream: a #GOutputStream.
1232  * @result: a #GAsyncResult.
1233  * @error: a #GError location to store the error occurring, or %NULL to 
1234  * ignore.
1235  *
1236  * Finishes an asynchronous stream splice operation.
1237  * 
1238  * Returns: a #gssize of the number of bytes spliced. Note that if the
1239  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1240  *     will be returned, and there is no way to determine the actual
1241  *     number of bytes spliced.
1242  **/
1243 gssize
1244 g_output_stream_splice_finish (GOutputStream  *stream,
1245                                GAsyncResult   *result,
1246                                GError        **error)
1247 {
1248   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1249   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1250   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
1251
1252   /* @result is always the GTask created by g_output_stream_splice_async();
1253    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
1254    */
1255   return g_task_propagate_int (G_TASK (result), error);
1256 }
1257
1258 static void
1259 async_ready_flush_callback_wrapper (GObject      *source_object,
1260                                     GAsyncResult *res,
1261                                     gpointer      user_data)
1262 {
1263   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1264   GOutputStreamClass *class;
1265   GTask *task = user_data;
1266   gboolean flushed;
1267   GError *error = NULL;
1268
1269   g_output_stream_clear_pending (stream);
1270   
1271   if (g_async_result_legacy_propagate_error (res, &error))
1272     flushed = FALSE;
1273   else
1274     {
1275       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1276       flushed = class->flush_finish (stream, res, &error);
1277     }
1278
1279   if (flushed)
1280     g_task_return_boolean (task, TRUE);
1281   else
1282     g_task_return_error (task, error);
1283   g_object_unref (task);
1284 }
1285
1286 /**
1287  * g_output_stream_flush_async:
1288  * @stream: a #GOutputStream.
1289  * @io_priority: the io priority of the request.
1290  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1291  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1292  * @user_data: (closure): the data to pass to callback function
1293  * 
1294  * Forces an asynchronous write of all user-space buffered data for
1295  * the given @stream.
1296  * For behaviour details see g_output_stream_flush().
1297  *
1298  * When the operation is finished @callback will be 
1299  * called. You can then call g_output_stream_flush_finish() to get the 
1300  * result of the operation.
1301  **/
1302 void
1303 g_output_stream_flush_async (GOutputStream       *stream,
1304                              int                  io_priority,
1305                              GCancellable        *cancellable,
1306                              GAsyncReadyCallback  callback,
1307                              gpointer             user_data)
1308 {
1309   GOutputStreamClass *class;
1310   GTask *task;
1311   GError *error = NULL;
1312
1313   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1314
1315   task = g_task_new (stream, cancellable, callback, user_data);
1316   g_task_set_source_tag (task, g_output_stream_flush_async);
1317   g_task_set_priority (task, io_priority);
1318
1319   if (!g_output_stream_set_pending (stream, &error))
1320     {
1321       g_task_return_error (task, error);
1322       g_object_unref (task);
1323       return;
1324     }
1325
1326   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1327   
1328   if (class->flush_async == NULL)
1329     {
1330       g_task_return_boolean (task, TRUE);
1331       g_object_unref (task);
1332       return;
1333     }
1334       
1335   class->flush_async (stream, io_priority, cancellable,
1336                       async_ready_flush_callback_wrapper, task);
1337 }
1338
1339 /**
1340  * g_output_stream_flush_finish:
1341  * @stream: a #GOutputStream.
1342  * @result: a GAsyncResult.
1343  * @error: a #GError location to store the error occurring, or %NULL to 
1344  * ignore.
1345  * 
1346  * Finishes flushing an output stream.
1347  * 
1348  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1349  **/
1350 gboolean
1351 g_output_stream_flush_finish (GOutputStream  *stream,
1352                               GAsyncResult   *result,
1353                               GError        **error)
1354 {
1355   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1356   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1357   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1358
1359   /* @result is always the GTask created by g_output_stream_flush_async();
1360    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1361    */
1362   return g_task_propagate_boolean (G_TASK (result), error);
1363 }
1364
1365
1366 static void
1367 async_ready_close_callback_wrapper (GObject      *source_object,
1368                                     GAsyncResult *res,
1369                                     gpointer      user_data)
1370 {
1371   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1372   GOutputStreamClass *class;
1373   GTask *task = user_data;
1374   GError *error = g_task_get_task_data (task);
1375
1376   stream->priv->closing = FALSE;
1377   stream->priv->closed = TRUE;
1378
1379   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1380     {
1381       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1382
1383       class->close_finish (stream, res,
1384                            error ? NULL : &error);
1385     }
1386
1387   if (error != NULL)
1388     g_task_return_error (task, error);
1389   else
1390     g_task_return_boolean (task, TRUE);
1391   g_object_unref (task);
1392 }
1393
1394 static void
1395 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1396                                             GAsyncResult *res,
1397                                             gpointer      user_data)
1398 {
1399   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1400   GOutputStreamClass *class;
1401   GTask *task = user_data;
1402   GError *error = NULL;
1403
1404   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1405
1406   if (!g_async_result_legacy_propagate_error (res, &error))
1407     {
1408       class->flush_finish (stream, res, &error);
1409     }
1410
1411   /* propagate the possible error */
1412   if (error)
1413     g_task_set_task_data (task, error, NULL);
1414
1415   /* we still close, even if there was a flush error */
1416   class->close_async (stream,
1417                       g_task_get_priority (task),
1418                       g_task_get_cancellable (task),
1419                       async_ready_close_callback_wrapper, task);
1420 }
1421
1422 static void
1423 real_close_async_cb (GObject      *source_object,
1424                      GAsyncResult *res,
1425                      gpointer      user_data)
1426 {
1427   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1428   GTask *task = user_data;
1429   GError *error = NULL;
1430   gboolean ret;
1431
1432   g_output_stream_clear_pending (stream);
1433
1434   ret = g_output_stream_internal_close_finish (stream, res, &error);
1435
1436   if (error != NULL)
1437     g_task_return_error (task, error);
1438   else
1439     g_task_return_boolean (task, ret);
1440
1441   g_object_unref (task);
1442 }
1443
1444 /**
1445  * g_output_stream_close_async:
1446  * @stream: A #GOutputStream.
1447  * @io_priority: the io priority of the request.
1448  * @cancellable: (allow-none): optional cancellable object
1449  * @callback: (scope async): callback to call when the request is satisfied
1450  * @user_data: (closure): the data to pass to callback function
1451  *
1452  * Requests an asynchronous close of the stream, releasing resources 
1453  * related to it. When the operation is finished @callback will be 
1454  * called. You can then call g_output_stream_close_finish() to get 
1455  * the result of the operation.
1456  *
1457  * For behaviour details see g_output_stream_close().
1458  *
1459  * The asyncronous methods have a default fallback that uses threads 
1460  * to implement asynchronicity, so they are optional for inheriting 
1461  * classes. However, if you override one you must override all.
1462  **/
1463 void
1464 g_output_stream_close_async (GOutputStream       *stream,
1465                              int                  io_priority,
1466                              GCancellable        *cancellable,
1467                              GAsyncReadyCallback  callback,
1468                              gpointer             user_data)
1469 {
1470   GTask *task;
1471   GError *error = NULL;
1472
1473   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1474   
1475   task = g_task_new (stream, cancellable, callback, user_data);
1476   g_task_set_source_tag (task, g_output_stream_close_async);
1477   g_task_set_priority (task, io_priority);
1478
1479   if (!g_output_stream_set_pending (stream, &error))
1480     {
1481       g_task_return_error (task, error);
1482       g_object_unref (task);
1483       return;
1484     }
1485
1486   g_output_stream_internal_close_async (stream, io_priority, cancellable,
1487                                         real_close_async_cb, task);
1488 }
1489
1490 /* Must always be called inside
1491  * g_output_stream_set_pending()/g_output_stream_clear_pending().
1492  */
1493 void
1494 g_output_stream_internal_close_async (GOutputStream       *stream,
1495                                       int                  io_priority,
1496                                       GCancellable        *cancellable,
1497                                       GAsyncReadyCallback  callback,
1498                                       gpointer             user_data)
1499 {
1500   GOutputStreamClass *class;
1501   GTask *task;
1502
1503   task = g_task_new (stream, cancellable, callback, user_data);
1504   g_task_set_source_tag (task, g_output_stream_internal_close_async);
1505   g_task_set_priority (task, io_priority);
1506
1507   if (stream->priv->closed)
1508     {
1509       g_task_return_boolean (task, TRUE);
1510       g_object_unref (task);
1511       return;
1512     }
1513
1514   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1515   stream->priv->closing = TRUE;
1516
1517   /* Call close_async directly if there is no need to flush, or if the flush
1518      can be done sync (in the output stream async close thread) */
1519   if (class->flush_async == NULL ||
1520       (class->flush_async == g_output_stream_real_flush_async &&
1521        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1522     {
1523       class->close_async (stream, io_priority, cancellable,
1524                           async_ready_close_callback_wrapper, task);
1525     }
1526   else
1527     {
1528       /* First do an async flush, then do the async close in the callback
1529          wrapper (see async_ready_close_flushed_callback_wrapper) */
1530       class->flush_async (stream, io_priority, cancellable,
1531                           async_ready_close_flushed_callback_wrapper, task);
1532     }
1533 }
1534
1535 static gboolean
1536 g_output_stream_internal_close_finish (GOutputStream  *stream,
1537                                        GAsyncResult   *result,
1538                                        GError        **error)
1539 {
1540   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1541   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1542   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
1543
1544   return g_task_propagate_boolean (G_TASK (result), error);
1545 }
1546
1547 /**
1548  * g_output_stream_close_finish:
1549  * @stream: a #GOutputStream.
1550  * @result: a #GAsyncResult.
1551  * @error: a #GError location to store the error occurring, or %NULL to 
1552  * ignore.
1553  * 
1554  * Closes an output stream.
1555  * 
1556  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1557  **/
1558 gboolean
1559 g_output_stream_close_finish (GOutputStream  *stream,
1560                               GAsyncResult   *result,
1561                               GError        **error)
1562 {
1563   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1564   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1565   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1566
1567   /* @result is always the GTask created by g_output_stream_close_async();
1568    * we called class->close_finish() from async_ready_close_callback_wrapper.
1569    */
1570   return g_task_propagate_boolean (G_TASK (result), error);
1571 }
1572
1573 /**
1574  * g_output_stream_is_closed:
1575  * @stream: a #GOutputStream.
1576  * 
1577  * Checks if an output stream has already been closed.
1578  * 
1579  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1580  **/
1581 gboolean
1582 g_output_stream_is_closed (GOutputStream *stream)
1583 {
1584   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1585   
1586   return stream->priv->closed;
1587 }
1588
1589 /**
1590  * g_output_stream_is_closing:
1591  * @stream: a #GOutputStream.
1592  *
1593  * Checks if an output stream is being closed. This can be
1594  * used inside e.g. a flush implementation to see if the
1595  * flush (or other i/o operation) is called from within
1596  * the closing operation.
1597  *
1598  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1599  *
1600  * Since: 2.24
1601  **/
1602 gboolean
1603 g_output_stream_is_closing (GOutputStream *stream)
1604 {
1605   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1606
1607   return stream->priv->closing;
1608 }
1609
1610 /**
1611  * g_output_stream_has_pending:
1612  * @stream: a #GOutputStream.
1613  * 
1614  * Checks if an ouput stream has pending actions.
1615  * 
1616  * Returns: %TRUE if @stream has pending actions. 
1617  **/
1618 gboolean
1619 g_output_stream_has_pending (GOutputStream *stream)
1620 {
1621   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1622   
1623   return stream->priv->pending;
1624 }
1625
1626 /**
1627  * g_output_stream_set_pending:
1628  * @stream: a #GOutputStream.
1629  * @error: a #GError location to store the error occurring, or %NULL to 
1630  * ignore.
1631  * 
1632  * Sets @stream to have actions pending. If the pending flag is
1633  * already set or @stream is closed, it will return %FALSE and set
1634  * @error.
1635  *
1636  * Returns: %TRUE if pending was previously unset and is now set.
1637  **/
1638 gboolean
1639 g_output_stream_set_pending (GOutputStream *stream,
1640                              GError **error)
1641 {
1642   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1643   
1644   if (stream->priv->closed)
1645     {
1646       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1647                            _("Stream is already closed"));
1648       return FALSE;
1649     }
1650   
1651   if (stream->priv->pending)
1652     {
1653       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1654                            /* Translators: This is an error you get if there is
1655                             * already an operation running against this stream when
1656                             * you try to start one */
1657                            _("Stream has outstanding operation"));
1658       return FALSE;
1659     }
1660   
1661   stream->priv->pending = TRUE;
1662   return TRUE;
1663 }
1664
1665 /**
1666  * g_output_stream_clear_pending:
1667  * @stream: output stream
1668  * 
1669  * Clears the pending flag on @stream.
1670  **/
1671 void
1672 g_output_stream_clear_pending (GOutputStream *stream)
1673 {
1674   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1675   
1676   stream->priv->pending = FALSE;
1677 }
1678
1679 /**
1680  * g_output_stream_async_write_is_via_threads:
1681  * @stream: a #GOutputStream.
1682  *
1683  * Checks if an ouput stream's write_async function uses threads.
1684  *
1685  * Returns: %TRUE if @stream's write_async function uses threads.
1686  **/
1687 gboolean
1688 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
1689 {
1690   GOutputStreamClass *class;
1691
1692   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1693
1694   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1695
1696   return (class->write_async == g_output_stream_real_write_async &&
1697       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1698         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
1699 }
1700
1701
1702 /********************************************
1703  *   Default implementation of async ops    *
1704  ********************************************/
1705
1706 typedef struct {
1707   const void         *buffer;
1708   gsize               count_requested;
1709   gssize              count_written;
1710 } WriteData;
1711
1712 static void
1713 free_write_data (WriteData *op)
1714 {
1715   g_slice_free (WriteData, op);
1716 }
1717
1718 static void
1719 write_async_thread (GTask        *task,
1720                     gpointer      source_object,
1721                     gpointer      task_data,
1722                     GCancellable *cancellable)
1723 {
1724   GOutputStream *stream = source_object;
1725   WriteData *op = task_data;
1726   GOutputStreamClass *class;
1727   GError *error = NULL;
1728   gssize count_written;
1729
1730   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1731   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1732                                    cancellable, &error);
1733   if (count_written == -1)
1734     g_task_return_error (task, error);
1735   else
1736     g_task_return_int (task, count_written);
1737 }
1738
1739 static void write_async_pollable (GPollableOutputStream *stream,
1740                                   GTask                 *task);
1741
1742 static gboolean
1743 write_async_pollable_ready (GPollableOutputStream *stream,
1744                             gpointer               user_data)
1745 {
1746   GTask *task = user_data;
1747
1748   write_async_pollable (stream, task);
1749   return FALSE;
1750 }
1751
1752 static void
1753 write_async_pollable (GPollableOutputStream *stream,
1754                       GTask                 *task)
1755 {
1756   GError *error = NULL;
1757   WriteData *op = g_task_get_task_data (task);
1758   gssize count_written;
1759
1760   if (g_task_return_error_if_cancelled (task))
1761     return;
1762
1763   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1764     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1765
1766   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1767     {
1768       GSource *source;
1769
1770       g_error_free (error);
1771
1772       source = g_pollable_output_stream_create_source (stream,
1773                                                        g_task_get_cancellable (task));
1774       g_task_attach_source (task, source,
1775                             (GSourceFunc) write_async_pollable_ready);
1776       g_source_unref (source);
1777       return;
1778     }
1779
1780   if (count_written == -1)
1781     g_task_return_error (task, error);
1782   else
1783     g_task_return_int (task, count_written);
1784 }
1785
1786 static void
1787 g_output_stream_real_write_async (GOutputStream       *stream,
1788                                   const void          *buffer,
1789                                   gsize                count,
1790                                   int                  io_priority,
1791                                   GCancellable        *cancellable,
1792                                   GAsyncReadyCallback  callback,
1793                                   gpointer             user_data)
1794 {
1795   GTask *task;
1796   WriteData *op;
1797
1798   op = g_slice_new0 (WriteData);
1799   task = g_task_new (stream, cancellable, callback, user_data);
1800   g_task_set_check_cancellable (task, FALSE);
1801   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1802   op->buffer = buffer;
1803   op->count_requested = count;
1804
1805   if (!g_output_stream_async_write_is_via_threads (stream))
1806     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1807   else
1808     g_task_run_in_thread (task, write_async_thread);
1809   g_object_unref (task);
1810 }
1811
1812 static gssize
1813 g_output_stream_real_write_finish (GOutputStream  *stream,
1814                                    GAsyncResult   *result,
1815                                    GError        **error)
1816 {
1817   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1818
1819   return g_task_propagate_int (G_TASK (result), error);
1820 }
1821
1822 typedef struct {
1823   GInputStream *source;
1824   GOutputStreamSpliceFlags flags;
1825   gssize n_read;
1826   gssize n_written;
1827   gsize bytes_copied;
1828   GError *error;
1829   guint8 *buffer;
1830 } SpliceData;
1831
1832 static void
1833 free_splice_data (SpliceData *op)
1834 {
1835   g_clear_pointer (&op->buffer, g_free);
1836   g_object_unref (op->source);
1837   g_clear_error (&op->error);
1838   g_free (op);
1839 }
1840
1841 static void
1842 real_splice_async_complete_cb (GTask *task)
1843 {
1844   SpliceData *op = g_task_get_task_data (task);
1845
1846   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
1847       !g_input_stream_is_closed (op->source))
1848     return;
1849
1850   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
1851       !g_output_stream_is_closed (g_task_get_source_object (task)))
1852     return;
1853
1854   if (op->error != NULL)
1855     {
1856       g_task_return_error (task, op->error);
1857       op->error = NULL;
1858     }
1859   else
1860     {
1861       g_task_return_int (task, op->bytes_copied);
1862     }
1863
1864   g_object_unref (task);
1865 }
1866
1867 static void
1868 real_splice_async_close_input_cb (GObject      *source,
1869                                   GAsyncResult *res,
1870                                   gpointer      user_data)
1871 {
1872   GTask *task = user_data;
1873
1874   g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
1875
1876   real_splice_async_complete_cb (task);
1877 }
1878
1879 static void
1880 real_splice_async_close_output_cb (GObject      *source,
1881                                    GAsyncResult *res,
1882                                    gpointer      user_data)
1883 {
1884   GTask *task = G_TASK (user_data);
1885   SpliceData *op = g_task_get_task_data (task);
1886   GError **error = (op->error == NULL) ? &op->error : NULL;
1887
1888   g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
1889
1890   real_splice_async_complete_cb (task);
1891 }
1892
1893 static void
1894 real_splice_async_complete (GTask *task)
1895 {
1896   SpliceData *op = g_task_get_task_data (task);
1897   gboolean done = TRUE;
1898
1899   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
1900     {
1901       done = FALSE;
1902       g_input_stream_close_async (op->source, g_task_get_priority (task),
1903                                   g_task_get_cancellable (task),
1904                                   real_splice_async_close_input_cb, task);
1905     }
1906
1907   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
1908     {
1909       done = FALSE;
1910       g_output_stream_internal_close_async (g_task_get_source_object (task),
1911                                             g_task_get_priority (task),
1912                                             g_task_get_cancellable (task),
1913                                             real_splice_async_close_output_cb,
1914                                             task);
1915     }
1916
1917   if (done)
1918     real_splice_async_complete_cb (task);
1919 }
1920
1921 static void real_splice_async_read_cb (GObject      *source,
1922                                        GAsyncResult *res,
1923                                        gpointer      user_data);
1924
1925 static void
1926 real_splice_async_write_cb (GObject      *source,
1927                             GAsyncResult *res,
1928                             gpointer      user_data)
1929 {
1930   GOutputStreamClass *class;
1931   GTask *task = G_TASK (user_data);
1932   SpliceData *op = g_task_get_task_data (task);
1933   gssize ret;
1934
1935   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1936
1937   ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
1938
1939   if (ret == -1)
1940     {
1941       real_splice_async_complete (task);
1942       return;
1943     }
1944
1945   op->n_written += ret;
1946   op->bytes_copied += ret;
1947   if (op->bytes_copied > G_MAXSSIZE)
1948     op->bytes_copied = G_MAXSSIZE;
1949
1950   if (op->n_written < op->n_read)
1951     {
1952       class->write_async (g_task_get_source_object (task),
1953                           op->buffer + op->n_written,
1954                           op->n_read - op->n_written,
1955                           g_task_get_priority (task),
1956                           g_task_get_cancellable (task),
1957                           real_splice_async_write_cb, task);
1958       return;
1959     }
1960
1961   g_input_stream_read_async (op->source, op->buffer, 8192,
1962                              g_task_get_priority (task),
1963                              g_task_get_cancellable (task),
1964                              real_splice_async_read_cb, task);
1965 }
1966
1967 static void
1968 real_splice_async_read_cb (GObject      *source,
1969                            GAsyncResult *res,
1970                            gpointer      user_data)
1971 {
1972   GOutputStreamClass *class;
1973   GTask *task = G_TASK (user_data);
1974   SpliceData *op = g_task_get_task_data (task);
1975   gssize ret;
1976
1977   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1978
1979   ret = g_input_stream_read_finish (op->source, res, &op->error);
1980   if (ret == -1 || ret == 0)
1981     {
1982       real_splice_async_complete (task);
1983       return;
1984     }
1985
1986   op->n_read = ret;
1987   op->n_written = 0;
1988
1989   class->write_async (g_task_get_source_object (task), op->buffer,
1990                       op->n_read, g_task_get_priority (task),
1991                       g_task_get_cancellable (task),
1992                       real_splice_async_write_cb, task);
1993 }
1994
1995 static void
1996 splice_async_thread (GTask        *task,
1997                      gpointer      source_object,
1998                      gpointer      task_data,
1999                      GCancellable *cancellable)
2000 {
2001   GOutputStream *stream = source_object;
2002   SpliceData *op = task_data;
2003   GOutputStreamClass *class;
2004   GError *error = NULL;
2005   gssize bytes_copied;
2006
2007   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2008   
2009   bytes_copied = class->splice (stream,
2010                                 op->source,
2011                                 op->flags,
2012                                 cancellable,
2013                                 &error);
2014   if (bytes_copied == -1)
2015     g_task_return_error (task, error);
2016   else
2017     g_task_return_int (task, bytes_copied);
2018 }
2019
2020 static void
2021 g_output_stream_real_splice_async (GOutputStream             *stream,
2022                                    GInputStream              *source,
2023                                    GOutputStreamSpliceFlags   flags,
2024                                    int                        io_priority,
2025                                    GCancellable              *cancellable,
2026                                    GAsyncReadyCallback        callback,
2027                                    gpointer                   user_data)
2028 {
2029   GTask *task;
2030   SpliceData *op;
2031
2032   op = g_new0 (SpliceData, 1);
2033   task = g_task_new (stream, cancellable, callback, user_data);
2034   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
2035   op->flags = flags;
2036   op->source = g_object_ref (source);
2037
2038   if (g_input_stream_async_read_is_via_threads (source) &&
2039       g_output_stream_async_write_is_via_threads (stream))
2040     {
2041       g_task_run_in_thread (task, splice_async_thread);
2042       g_object_unref (task);
2043     }
2044   else
2045     {
2046       op->buffer = g_malloc (8192);
2047       g_input_stream_read_async (op->source, op->buffer, 8192,
2048                                  g_task_get_priority (task),
2049                                  g_task_get_cancellable (task),
2050                                  real_splice_async_read_cb, task);
2051     }
2052 }
2053
2054 static gssize
2055 g_output_stream_real_splice_finish (GOutputStream  *stream,
2056                                     GAsyncResult   *result,
2057                                     GError        **error)
2058 {
2059   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2060
2061   return g_task_propagate_int (G_TASK (result), error);
2062 }
2063
2064
2065 static void
2066 flush_async_thread (GTask        *task,
2067                     gpointer      source_object,
2068                     gpointer      task_data,
2069                     GCancellable *cancellable)
2070 {
2071   GOutputStream *stream = source_object;
2072   GOutputStreamClass *class;
2073   gboolean result;
2074   GError *error = NULL;
2075
2076   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2077   result = TRUE;
2078   if (class->flush)
2079     result = class->flush (stream, cancellable, &error);
2080
2081   if (result)
2082     g_task_return_boolean (task, TRUE);
2083   else
2084     g_task_return_error (task, error);
2085 }
2086
2087 static void
2088 g_output_stream_real_flush_async (GOutputStream       *stream,
2089                                   int                  io_priority,
2090                                   GCancellable        *cancellable,
2091                                   GAsyncReadyCallback  callback,
2092                                   gpointer             user_data)
2093 {
2094   GTask *task;
2095
2096   task = g_task_new (stream, cancellable, callback, user_data);
2097   g_task_set_priority (task, io_priority);
2098   g_task_run_in_thread (task, flush_async_thread);
2099   g_object_unref (task);
2100 }
2101
2102 static gboolean
2103 g_output_stream_real_flush_finish (GOutputStream  *stream,
2104                                    GAsyncResult   *result,
2105                                    GError        **error)
2106 {
2107   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2108
2109   return g_task_propagate_boolean (G_TASK (result), error);
2110 }
2111
2112 static void
2113 close_async_thread (GTask        *task,
2114                     gpointer      source_object,
2115                     gpointer      task_data,
2116                     GCancellable *cancellable)
2117 {
2118   GOutputStream *stream = source_object;
2119   GOutputStreamClass *class;
2120   GError *error = NULL;
2121   gboolean result = TRUE;
2122
2123   class = G_OUTPUT_STREAM_GET_CLASS (stream);
2124
2125   /* Do a flush here if there is a flush function, and we did not have to do
2126    * an async flush before (see g_output_stream_close_async)
2127    */
2128   if (class->flush != NULL &&
2129       (class->flush_async == NULL ||
2130        class->flush_async == g_output_stream_real_flush_async))
2131     {
2132       result = class->flush (stream, cancellable, &error);
2133     }
2134
2135   /* Auto handling of cancelation disabled, and ignore
2136      cancellation, since we want to close things anyway, although
2137      possibly in a quick-n-dirty way. At least we never want to leak
2138      open handles */
2139
2140   if (class->close_fn)
2141     {
2142       /* Make sure to close, even if the flush failed (see sync close) */
2143       if (!result)
2144         class->close_fn (stream, cancellable, NULL);
2145       else
2146         result = class->close_fn (stream, cancellable, &error);
2147     }
2148
2149   if (result)
2150     g_task_return_boolean (task, TRUE);
2151   else
2152     g_task_return_error (task, error);
2153 }
2154
2155 static void
2156 g_output_stream_real_close_async (GOutputStream       *stream,
2157                                   int                  io_priority,
2158                                   GCancellable        *cancellable,
2159                                   GAsyncReadyCallback  callback,
2160                                   gpointer             user_data)
2161 {
2162   GTask *task;
2163
2164   task = g_task_new (stream, cancellable, callback, user_data);
2165   g_task_set_priority (task, io_priority);
2166   g_task_run_in_thread (task, close_async_thread);
2167   g_object_unref (task);
2168 }
2169
2170 static gboolean
2171 g_output_stream_real_close_finish (GOutputStream  *stream,
2172                                    GAsyncResult   *result,
2173                                    GError        **error)
2174 {
2175   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
2176
2177   return g_task_propagate_boolean (G_TASK (result), error);
2178 }