Fix several recently-introduced bugs in g_output_stream_write_async()
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gsimpleasyncresult.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
49
50 struct _GOutputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   guint closing : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101
102 static void
103 g_output_stream_finalize (GObject *object)
104 {
105   G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object);
106 }
107
108 static void
109 g_output_stream_dispose (GObject *object)
110 {
111   GOutputStream *stream;
112
113   stream = G_OUTPUT_STREAM (object);
114   
115   if (!stream->priv->closed)
116     g_output_stream_close (stream, NULL, NULL);
117
118   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
119 }
120
121 static void
122 g_output_stream_class_init (GOutputStreamClass *klass)
123 {
124   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
125   
126   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
127   
128   gobject_class->finalize = g_output_stream_finalize;
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
147                                               G_TYPE_OUTPUT_STREAM,
148                                               GOutputStreamPrivate);
149 }
150
151 /**
152  * g_output_stream_write:
153  * @stream: a #GOutputStream.
154  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
155  * @count: the number of bytes to write
156  * @cancellable: (allow-none): optional cancellable object
157  * @error: location to store the error occurring, or %NULL to ignore
158  *
159  * Tries to write @count bytes from @buffer into the stream. Will block
160  * during the operation.
161  * 
162  * If count is 0, returns 0 and does nothing. A value of @count
163  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
164  *
165  * On success, the number of bytes written to the stream is returned.
166  * It is not an error if this is not the same as the requested size, as it
167  * can happen e.g. on a partial I/O error, or if there is not enough
168  * storage in the stream. All writes block until at least one byte
169  * is written or an error occurs; 0 is never returned (unless
170  * @count is 0).
171  * 
172  * If @cancellable is not %NULL, then the operation can be cancelled by
173  * triggering the cancellable object from another thread. If the operation
174  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
175  * operation was partially finished when the operation was cancelled the
176  * partial result will be returned, without an error.
177  *
178  * On error -1 is returned and @error is set accordingly.
179  * 
180  * Virtual: write_fn
181  *
182  * Return value: Number of bytes written, or -1 on error
183  **/
184 gssize
185 g_output_stream_write (GOutputStream  *stream,
186                        const void     *buffer,
187                        gsize           count,
188                        GCancellable   *cancellable,
189                        GError        **error)
190 {
191   GOutputStreamClass *class;
192   gssize res;
193
194   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
195   g_return_val_if_fail (buffer != NULL, 0);
196
197   if (count == 0)
198     return 0;
199   
200   if (((gssize) count) < 0)
201     {
202       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
203                    _("Too large count value passed to %s"), G_STRFUNC);
204       return -1;
205     }
206
207   class = G_OUTPUT_STREAM_GET_CLASS (stream);
208
209   if (class->write_fn == NULL) 
210     {
211       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
212                            _("Output stream doesn't implement write"));
213       return -1;
214     }
215   
216   if (!g_output_stream_set_pending (stream, error))
217     return -1;
218   
219   if (cancellable)
220     g_cancellable_push_current (cancellable);
221   
222   res = class->write_fn (stream, buffer, count, cancellable, error);
223   
224   if (cancellable)
225     g_cancellable_pop_current (cancellable);
226   
227   g_output_stream_clear_pending (stream);
228
229   return res; 
230 }
231
232 /**
233  * g_output_stream_write_all:
234  * @stream: a #GOutputStream.
235  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
236  * @count: the number of bytes to write
237  * @bytes_written: (out): location to store the number of bytes that was 
238  *     written to the stream
239  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
240  * @error: location to store the error occurring, or %NULL to ignore
241  *
242  * Tries to write @count bytes from @buffer into the stream. Will block
243  * during the operation.
244  * 
245  * This function is similar to g_output_stream_write(), except it tries to
246  * write as many bytes as requested, only stopping on an error.
247  *
248  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
249  * is set to @count.
250  * 
251  * If there is an error during the operation %FALSE is returned and @error
252  * is set to indicate the error status, @bytes_written is updated to contain
253  * the number of bytes written into the stream before the error occurred.
254  *
255  * Return value: %TRUE on success, %FALSE if there was an error
256  **/
257 gboolean
258 g_output_stream_write_all (GOutputStream  *stream,
259                            const void     *buffer,
260                            gsize           count,
261                            gsize          *bytes_written,
262                            GCancellable   *cancellable,
263                            GError        **error)
264 {
265   gsize _bytes_written;
266   gssize res;
267
268   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
269   g_return_val_if_fail (buffer != NULL, FALSE);
270
271   _bytes_written = 0;
272   while (_bytes_written < count)
273     {
274       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
275                                    cancellable, error);
276       if (res == -1)
277         {
278           if (bytes_written)
279             *bytes_written = _bytes_written;
280           return FALSE;
281         }
282       
283       if (res == 0)
284         g_warning ("Write returned zero without error");
285
286       _bytes_written += res;
287     }
288   
289   if (bytes_written)
290     *bytes_written = _bytes_written;
291
292   return TRUE;
293 }
294
295 /**
296  * g_output_stream_flush:
297  * @stream: a #GOutputStream.
298  * @cancellable: (allow-none): optional cancellable object
299  * @error: location to store the error occurring, or %NULL to ignore
300  *
301  * Forces a write of all user-space buffered data for the given
302  * @stream. Will block during the operation. Closing the stream will
303  * implicitly cause a flush.
304  *
305  * This function is optional for inherited classes.
306  * 
307  * If @cancellable is not %NULL, then the operation can be cancelled by
308  * triggering the cancellable object from another thread. If the operation
309  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
310  *
311  * Return value: %TRUE on success, %FALSE on error
312  **/
313 gboolean
314 g_output_stream_flush (GOutputStream  *stream,
315                        GCancellable   *cancellable,
316                        GError        **error)
317 {
318   GOutputStreamClass *class;
319   gboolean res;
320
321   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
322
323   if (!g_output_stream_set_pending (stream, error))
324     return FALSE;
325   
326   class = G_OUTPUT_STREAM_GET_CLASS (stream);
327
328   res = TRUE;
329   if (class->flush)
330     {
331       if (cancellable)
332         g_cancellable_push_current (cancellable);
333       
334       res = class->flush (stream, cancellable, error);
335       
336       if (cancellable)
337         g_cancellable_pop_current (cancellable);
338     }
339   
340   g_output_stream_clear_pending (stream);
341
342   return res;
343 }
344
345 /**
346  * g_output_stream_splice:
347  * @stream: a #GOutputStream.
348  * @source: a #GInputStream.
349  * @flags: a set of #GOutputStreamSpliceFlags.
350  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
351  * @error: a #GError location to store the error occurring, or %NULL to
352  * ignore.
353  *
354  * Splices an input stream into an output stream.
355  *
356  * Returns: a #gssize containing the size of the data spliced, or
357  *     -1 if an error occurred. Note that if the number of bytes
358  *     spliced is greater than %G_MAXSSIZE, then that will be
359  *     returned, and there is no way to determine the actual number
360  *     of bytes spliced.
361  **/
362 gssize
363 g_output_stream_splice (GOutputStream             *stream,
364                         GInputStream              *source,
365                         GOutputStreamSpliceFlags   flags,
366                         GCancellable              *cancellable,
367                         GError                   **error)
368 {
369   GOutputStreamClass *class;
370   gssize bytes_copied;
371
372   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
373   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
374
375   if (g_input_stream_is_closed (source))
376     {
377       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
378                            _("Source stream is already closed"));
379       return -1;
380     }
381
382   if (!g_output_stream_set_pending (stream, error))
383     return -1;
384
385   class = G_OUTPUT_STREAM_GET_CLASS (stream);
386
387   if (cancellable)
388     g_cancellable_push_current (cancellable);
389
390   bytes_copied = class->splice (stream, source, flags, cancellable, error);
391
392   if (cancellable)
393     g_cancellable_pop_current (cancellable);
394
395   g_output_stream_clear_pending (stream);
396
397   return bytes_copied;
398 }
399
400 static gssize
401 g_output_stream_real_splice (GOutputStream             *stream,
402                              GInputStream              *source,
403                              GOutputStreamSpliceFlags   flags,
404                              GCancellable              *cancellable,
405                              GError                   **error)
406 {
407   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
408   gssize n_read, n_written;
409   gsize bytes_copied;
410   char buffer[8192], *p;
411   gboolean res;
412
413   bytes_copied = 0;
414   if (class->write_fn == NULL)
415     {
416       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
417                            _("Output stream doesn't implement write"));
418       res = FALSE;
419       goto notsupported;
420     }
421
422   res = TRUE;
423   do
424     {
425       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
426       if (n_read == -1)
427         {
428           res = FALSE;
429           break;
430         }
431
432       if (n_read == 0)
433         break;
434
435       p = buffer;
436       while (n_read > 0)
437         {
438           n_written = class->write_fn (stream, p, n_read, cancellable, error);
439           if (n_written == -1)
440             {
441               res = FALSE;
442               break;
443             }
444
445           p += n_written;
446           n_read -= n_written;
447           bytes_copied += n_written;
448         }
449
450       if (bytes_copied > G_MAXSSIZE)
451         bytes_copied = G_MAXSSIZE;
452     }
453   while (res);
454
455  notsupported:
456   if (!res)
457     error = NULL; /* Ignore further errors */
458
459   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
460     {
461       /* Don't care about errors in source here */
462       g_input_stream_close (source, cancellable, NULL);
463     }
464
465   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
466     {
467       /* But write errors on close are bad! */
468       res = _g_output_stream_close_internal (stream, cancellable, error);
469     }
470
471   if (res)
472     return bytes_copied;
473
474   return -1;
475 }
476
477 /* Must always be called inside
478  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
479 static gboolean
480 _g_output_stream_close_internal (GOutputStream  *stream,
481                                  GCancellable   *cancellable,
482                                  GError        **error)
483 {
484   GOutputStreamClass *class;
485   gboolean res;
486
487   if (stream->priv->closed)
488     return TRUE;
489
490   class = G_OUTPUT_STREAM_GET_CLASS (stream);
491
492   stream->priv->closing = TRUE;
493
494   if (cancellable)
495     g_cancellable_push_current (cancellable);
496
497   if (class->flush)
498     res = class->flush (stream, cancellable, error);
499   else
500     res = TRUE;
501
502   if (!res)
503     {
504       /* flushing caused the error that we want to return,
505        * but we still want to close the underlying stream if possible
506        */
507       if (class->close_fn)
508         class->close_fn (stream, cancellable, NULL);
509     }
510   else
511     {
512       res = TRUE;
513       if (class->close_fn)
514         res = class->close_fn (stream, cancellable, error);
515     }
516
517   if (cancellable)
518     g_cancellable_pop_current (cancellable);
519
520   stream->priv->closing = FALSE;
521   stream->priv->closed = TRUE;
522
523   return res;
524 }
525
526 /**
527  * g_output_stream_close:
528  * @stream: A #GOutputStream.
529  * @cancellable: (allow-none): optional cancellable object
530  * @error: location to store the error occurring, or %NULL to ignore
531  *
532  * Closes the stream, releasing resources related to it.
533  *
534  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
535  * Closing a stream multiple times will not return an error.
536  *
537  * Closing a stream will automatically flush any outstanding buffers in the
538  * stream.
539  *
540  * Streams will be automatically closed when the last reference
541  * is dropped, but you might want to call this function to make sure 
542  * resources are released as early as possible.
543  *
544  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
545  * open after the stream is closed. See the documentation for the individual
546  * stream for details.
547  *
548  * On failure the first error that happened will be reported, but the close
549  * operation will finish as much as possible. A stream that failed to
550  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
551  * is important to check and report the error to the user, otherwise
552  * there might be a loss of data as all data might not be written.
553  * 
554  * If @cancellable is not %NULL, then the operation can be cancelled by
555  * triggering the cancellable object from another thread. If the operation
556  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
557  * Cancelling a close will still leave the stream closed, but there some streams
558  * can use a faster close that doesn't block to e.g. check errors. On
559  * cancellation (as with any error) there is no guarantee that all written
560  * data will reach the target. 
561  *
562  * Return value: %TRUE on success, %FALSE on failure
563  **/
564 gboolean
565 g_output_stream_close (GOutputStream  *stream,
566                        GCancellable   *cancellable,
567                        GError        **error)
568 {
569   gboolean res;
570
571   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
572
573   if (stream->priv->closed)
574     return TRUE;
575
576   if (!g_output_stream_set_pending (stream, error))
577     return FALSE;
578
579   res = _g_output_stream_close_internal (stream, cancellable, error);
580
581   g_output_stream_clear_pending (stream);
582   
583   return res;
584 }
585
586 static void
587 async_ready_callback_wrapper (GObject      *source_object,
588                               GAsyncResult *res,
589                               gpointer      user_data)
590 {
591   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
592
593   g_output_stream_clear_pending (stream);
594   if (stream->priv->outstanding_callback)
595     (*stream->priv->outstanding_callback) (source_object, res, user_data);
596   g_object_unref (stream);
597 }
598
599 typedef struct {
600   gint io_priority;
601   GCancellable *cancellable;
602   GError *flush_error;
603   gpointer user_data;
604 } CloseUserData;
605
606 static void
607 async_ready_close_callback_wrapper (GObject      *source_object,
608                                     GAsyncResult *res,
609                                     gpointer      user_data)
610 {
611   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
612   CloseUserData *data = user_data;
613
614   stream->priv->closing = FALSE;
615   stream->priv->closed = TRUE;
616
617   g_output_stream_clear_pending (stream);
618
619   if (stream->priv->outstanding_callback)
620     {
621       if (data->flush_error != NULL)
622         {
623           GSimpleAsyncResult *err;
624
625           err = g_simple_async_result_new_take_error (source_object,
626                                                       stream->priv->outstanding_callback,
627                                                       data->user_data,
628                                                       data->flush_error);
629           data->flush_error = NULL;
630
631           (*stream->priv->outstanding_callback) (source_object,
632                                                  G_ASYNC_RESULT (err),
633                                                  data->user_data);
634           g_object_unref (err);
635         }
636       else
637         {
638           (*stream->priv->outstanding_callback) (source_object,
639                                                  res,
640                                                  data->user_data);
641         }
642     }
643
644   g_object_unref (stream);
645
646   if (data->cancellable)
647     g_object_unref (data->cancellable);
648
649   if (data->flush_error)
650     g_error_free (data->flush_error);
651
652   g_slice_free (CloseUserData, data);
653 }
654
655 static void
656 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
657                                             GAsyncResult *res,
658                                             gpointer      user_data)
659 {
660   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
661   GOutputStreamClass *class;
662   CloseUserData *data = user_data;
663   GSimpleAsyncResult *simple;
664
665   /* propagate the possible error */
666   if (G_IS_SIMPLE_ASYNC_RESULT (res))
667     {
668       simple = G_SIMPLE_ASYNC_RESULT (res);
669       g_simple_async_result_propagate_error (simple, &data->flush_error);
670     }
671
672   class = G_OUTPUT_STREAM_GET_CLASS (stream);
673
674   /* we still close, even if there was a flush error */
675   class->close_async (stream, data->io_priority, data->cancellable,
676                       async_ready_close_callback_wrapper, user_data);
677 }
678
679 /**
680  * g_output_stream_write_async:
681  * @stream: A #GOutputStream.
682  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
683  * @count: the number of bytes to write
684  * @io_priority: the io priority of the request.
685  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
686  * @callback: (scope async): callback to call when the request is satisfied
687  * @user_data: (closure): the data to pass to callback function
688  *
689  * Request an asynchronous write of @count bytes from @buffer into 
690  * the stream. When the operation is finished @callback will be called.
691  * You can then call g_output_stream_write_finish() to get the result of the 
692  * operation.
693  *
694  * During an async request no other sync and async calls are allowed, 
695  * and will result in %G_IO_ERROR_PENDING errors. 
696  *
697  * A value of @count larger than %G_MAXSSIZE will cause a 
698  * %G_IO_ERROR_INVALID_ARGUMENT error.
699  *
700  * On success, the number of bytes written will be passed to the
701  * @callback. It is not an error if this is not the same as the 
702  * requested size, as it can happen e.g. on a partial I/O error, 
703  * but generally we try to write as many bytes as requested. 
704  *
705  * You are guaranteed that this method will never fail with
706  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
707  * method will just wait until this changes.
708  *
709  * Any outstanding I/O request with higher priority (lower numerical 
710  * value) will be executed before an outstanding request with lower 
711  * priority. Default priority is %G_PRIORITY_DEFAULT.
712  *
713  * The asyncronous methods have a default fallback that uses threads 
714  * to implement asynchronicity, so they are optional for inheriting 
715  * classes. However, if you override one you must override all.
716  *
717  * For the synchronous, blocking version of this function, see 
718  * g_output_stream_write().
719  **/
720 void
721 g_output_stream_write_async (GOutputStream       *stream,
722                              const void          *buffer,
723                              gsize                count,
724                              int                  io_priority,
725                              GCancellable        *cancellable,
726                              GAsyncReadyCallback  callback,
727                              gpointer             user_data)
728 {
729   GOutputStreamClass *class;
730   GSimpleAsyncResult *simple;
731   GError *error = NULL;
732
733   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
734   g_return_if_fail (buffer != NULL);
735
736   if (count == 0)
737     {
738       simple = g_simple_async_result_new (G_OBJECT (stream),
739                                           callback,
740                                           user_data,
741                                           g_output_stream_write_async);
742       g_simple_async_result_complete_in_idle (simple);
743       g_object_unref (simple);
744       return;
745     }
746
747   if (((gssize) count) < 0)
748     {
749       g_simple_async_report_error_in_idle (G_OBJECT (stream),
750                                            callback,
751                                            user_data,
752                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
753                                            _("Too large count value passed to %s"),
754                                            G_STRFUNC);
755       return;
756     }
757
758   if (!g_output_stream_set_pending (stream, &error))
759     {
760       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
761                                             callback,
762                                             user_data,
763                                             error);
764       return;
765     }
766   
767   class = G_OUTPUT_STREAM_GET_CLASS (stream);
768
769   stream->priv->outstanding_callback = callback;
770   g_object_ref (stream);
771   class->write_async (stream, buffer, count, io_priority, cancellable,
772                       async_ready_callback_wrapper, user_data);
773 }
774
775 /**
776  * g_output_stream_write_finish:
777  * @stream: a #GOutputStream.
778  * @result: a #GAsyncResult.
779  * @error: a #GError location to store the error occurring, or %NULL to 
780  * ignore.
781  * 
782  * Finishes a stream write operation.
783  * 
784  * Returns: a #gssize containing the number of bytes written to the stream.
785  **/
786 gssize
787 g_output_stream_write_finish (GOutputStream  *stream,
788                               GAsyncResult   *result,
789                               GError        **error)
790 {
791   GSimpleAsyncResult *simple;
792   GOutputStreamClass *class;
793
794   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
795   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
796
797   if (G_IS_SIMPLE_ASYNC_RESULT (result))
798     {
799       simple = G_SIMPLE_ASYNC_RESULT (result);
800       if (g_simple_async_result_propagate_error (simple, error))
801         return -1;
802
803       /* Special case writes of 0 bytes */
804       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async)
805         return 0;
806     }
807   
808   class = G_OUTPUT_STREAM_GET_CLASS (stream);
809   return class->write_finish (stream, result, error);
810 }
811
812 typedef struct {
813   GInputStream *source;
814   gpointer user_data;
815   GAsyncReadyCallback callback;
816 } SpliceUserData;
817
818 static void
819 async_ready_splice_callback_wrapper (GObject      *source_object,
820                                      GAsyncResult *res,
821                                      gpointer     _data)
822 {
823   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
824   SpliceUserData *data = _data;
825   
826   g_output_stream_clear_pending (stream);
827   
828   if (data->callback)
829     (*data->callback) (source_object, res, data->user_data);
830   
831   g_object_unref (stream);
832   g_object_unref (data->source);
833   g_free (data);
834 }
835
836 /**
837  * g_output_stream_splice_async:
838  * @stream: a #GOutputStream.
839  * @source: a #GInputStream. 
840  * @flags: a set of #GOutputStreamSpliceFlags.
841  * @io_priority: the io priority of the request.
842  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
843  * @callback: (scope async): a #GAsyncReadyCallback. 
844  * @user_data: (closure): user data passed to @callback.
845  * 
846  * Splices a stream asynchronously.
847  * When the operation is finished @callback will be called.
848  * You can then call g_output_stream_splice_finish() to get the 
849  * result of the operation.
850  *
851  * For the synchronous, blocking version of this function, see 
852  * g_output_stream_splice().
853  **/
854 void
855 g_output_stream_splice_async (GOutputStream            *stream,
856                               GInputStream             *source,
857                               GOutputStreamSpliceFlags  flags,
858                               int                       io_priority,
859                               GCancellable             *cancellable,
860                               GAsyncReadyCallback       callback,
861                               gpointer                  user_data)
862 {
863   GOutputStreamClass *class;
864   SpliceUserData *data;
865   GError *error = NULL;
866
867   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
868   g_return_if_fail (G_IS_INPUT_STREAM (source));
869
870   if (g_input_stream_is_closed (source))
871     {
872       g_simple_async_report_error_in_idle (G_OBJECT (stream),
873                                            callback,
874                                            user_data,
875                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
876                                            _("Source stream is already closed"));
877       return;
878     }
879   
880   if (!g_output_stream_set_pending (stream, &error))
881     {
882       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
883                                             callback,
884                                             user_data,
885                                             error);
886       return;
887     }
888
889   class = G_OUTPUT_STREAM_GET_CLASS (stream);
890
891   data = g_new0 (SpliceUserData, 1);
892   data->callback = callback;
893   data->user_data = user_data;
894   data->source = g_object_ref (source);
895   
896   g_object_ref (stream);
897   class->splice_async (stream, source, flags, io_priority, cancellable,
898                       async_ready_splice_callback_wrapper, data);
899 }
900
901 /**
902  * g_output_stream_splice_finish:
903  * @stream: a #GOutputStream.
904  * @result: a #GAsyncResult.
905  * @error: a #GError location to store the error occurring, or %NULL to 
906  * ignore.
907  *
908  * Finishes an asynchronous stream splice operation.
909  * 
910  * Returns: a #gssize of the number of bytes spliced. Note that if the
911  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
912  *     will be returned, and there is no way to determine the actual
913  *     number of bytes spliced.
914  **/
915 gssize
916 g_output_stream_splice_finish (GOutputStream  *stream,
917                                GAsyncResult   *result,
918                                GError        **error)
919 {
920   GSimpleAsyncResult *simple;
921   GOutputStreamClass *class;
922
923   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
924   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
925
926   if (G_IS_SIMPLE_ASYNC_RESULT (result))
927     {
928       simple = G_SIMPLE_ASYNC_RESULT (result);
929       if (g_simple_async_result_propagate_error (simple, error))
930         return -1;
931     }
932   
933   class = G_OUTPUT_STREAM_GET_CLASS (stream);
934   return class->splice_finish (stream, result, error);
935 }
936
937 /**
938  * g_output_stream_flush_async:
939  * @stream: a #GOutputStream.
940  * @io_priority: the io priority of the request.
941  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
942  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
943  * @user_data: (closure): the data to pass to callback function
944  * 
945  * Forces an asynchronous write of all user-space buffered data for
946  * the given @stream.
947  * For behaviour details see g_output_stream_flush().
948  *
949  * When the operation is finished @callback will be 
950  * called. You can then call g_output_stream_flush_finish() to get the 
951  * result of the operation.
952  **/
953 void
954 g_output_stream_flush_async (GOutputStream       *stream,
955                              int                  io_priority,
956                              GCancellable        *cancellable,
957                              GAsyncReadyCallback  callback,
958                              gpointer             user_data)
959 {
960   GOutputStreamClass *class;
961   GSimpleAsyncResult *simple;
962   GError *error = NULL;
963
964   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
965
966   if (!g_output_stream_set_pending (stream, &error))
967     {
968       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
969                                             callback,
970                                             user_data,
971                                             error);
972       return;
973     }
974
975   stream->priv->outstanding_callback = callback;
976   g_object_ref (stream);
977
978   class = G_OUTPUT_STREAM_GET_CLASS (stream);
979   
980   if (class->flush_async == NULL)
981     {
982       simple = g_simple_async_result_new (G_OBJECT (stream),
983                                           async_ready_callback_wrapper,
984                                           user_data,
985                                           g_output_stream_flush_async);
986       g_simple_async_result_complete_in_idle (simple);
987       g_object_unref (simple);
988       return;
989     }
990       
991   class->flush_async (stream, io_priority, cancellable,
992                       async_ready_callback_wrapper, user_data);
993 }
994
995 /**
996  * g_output_stream_flush_finish:
997  * @stream: a #GOutputStream.
998  * @result: a GAsyncResult.
999  * @error: a #GError location to store the error occurring, or %NULL to 
1000  * ignore.
1001  * 
1002  * Finishes flushing an output stream.
1003  * 
1004  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1005  **/
1006 gboolean
1007 g_output_stream_flush_finish (GOutputStream  *stream,
1008                               GAsyncResult   *result,
1009                               GError        **error)
1010 {
1011   GSimpleAsyncResult *simple;
1012   GOutputStreamClass *klass;
1013
1014   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1015   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1016
1017   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1018     {
1019       simple = G_SIMPLE_ASYNC_RESULT (result);
1020       if (g_simple_async_result_propagate_error (simple, error))
1021         return FALSE;
1022
1023       /* Special case default implementation */
1024       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async)
1025         return TRUE;
1026     }
1027
1028   klass = G_OUTPUT_STREAM_GET_CLASS (stream);
1029   return klass->flush_finish (stream, result, error);
1030 }
1031
1032
1033 /**
1034  * g_output_stream_close_async:
1035  * @stream: A #GOutputStream.
1036  * @io_priority: the io priority of the request.
1037  * @cancellable: (allow-none): optional cancellable object
1038  * @callback: (scope async): callback to call when the request is satisfied
1039  * @user_data: (closure): the data to pass to callback function
1040  *
1041  * Requests an asynchronous close of the stream, releasing resources 
1042  * related to it. When the operation is finished @callback will be 
1043  * called. You can then call g_output_stream_close_finish() to get 
1044  * the result of the operation.
1045  *
1046  * For behaviour details see g_output_stream_close().
1047  *
1048  * The asyncronous methods have a default fallback that uses threads 
1049  * to implement asynchronicity, so they are optional for inheriting 
1050  * classes. However, if you override one you must override all.
1051  **/
1052 void
1053 g_output_stream_close_async (GOutputStream       *stream,
1054                              int                  io_priority,
1055                              GCancellable        *cancellable,
1056                              GAsyncReadyCallback  callback,
1057                              gpointer             user_data)
1058 {
1059   GOutputStreamClass *class;
1060   GSimpleAsyncResult *simple;
1061   GError *error = NULL;
1062   CloseUserData *data;
1063
1064   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1065   
1066   if (stream->priv->closed)
1067     {
1068       simple = g_simple_async_result_new (G_OBJECT (stream),
1069                                           callback,
1070                                           user_data,
1071                                           g_output_stream_close_async);
1072       g_simple_async_result_complete_in_idle (simple);
1073       g_object_unref (simple);
1074       return;
1075     }
1076
1077   if (!g_output_stream_set_pending (stream, &error))
1078     {
1079       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1080                                             callback,
1081                                             user_data,
1082                                             error);
1083       return;
1084     }
1085   
1086   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1087   stream->priv->closing = TRUE;
1088   stream->priv->outstanding_callback = callback;
1089   g_object_ref (stream);
1090
1091   data = g_slice_new0 (CloseUserData);
1092
1093   if (cancellable != NULL)
1094     data->cancellable = g_object_ref (cancellable);
1095
1096   data->io_priority = io_priority;
1097   data->user_data = user_data;
1098
1099   /* Call close_async directly if there is no need to flush, or if the flush
1100      can be done sync (in the output stream async close thread) */
1101   if (class->flush_async == NULL ||
1102       (class->flush_async == g_output_stream_real_flush_async &&
1103        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1104     {
1105       class->close_async (stream, io_priority, cancellable,
1106                           async_ready_close_callback_wrapper, data);
1107     }
1108   else
1109     {
1110       /* First do an async flush, then do the async close in the callback
1111          wrapper (see async_ready_close_flushed_callback_wrapper) */
1112       class->flush_async (stream, io_priority, cancellable,
1113                           async_ready_close_flushed_callback_wrapper, data);
1114     }
1115 }
1116
1117 /**
1118  * g_output_stream_close_finish:
1119  * @stream: a #GOutputStream.
1120  * @result: a #GAsyncResult.
1121  * @error: a #GError location to store the error occurring, or %NULL to 
1122  * ignore.
1123  * 
1124  * Closes an output stream.
1125  * 
1126  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1127  **/
1128 gboolean
1129 g_output_stream_close_finish (GOutputStream  *stream,
1130                               GAsyncResult   *result,
1131                               GError        **error)
1132 {
1133   GSimpleAsyncResult *simple;
1134   GOutputStreamClass *class;
1135
1136   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1137   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1138
1139   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1140     {
1141       simple = G_SIMPLE_ASYNC_RESULT (result);
1142       if (g_simple_async_result_propagate_error (simple, error))
1143         return FALSE;
1144
1145       /* Special case already closed */
1146       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async)
1147         return TRUE;
1148     }
1149
1150   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1151   return class->close_finish (stream, result, error);
1152 }
1153
1154 /**
1155  * g_output_stream_is_closed:
1156  * @stream: a #GOutputStream.
1157  * 
1158  * Checks if an output stream has already been closed.
1159  * 
1160  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1161  **/
1162 gboolean
1163 g_output_stream_is_closed (GOutputStream *stream)
1164 {
1165   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1166   
1167   return stream->priv->closed;
1168 }
1169
1170 /**
1171  * g_output_stream_is_closing:
1172  * @stream: a #GOutputStream.
1173  *
1174  * Checks if an output stream is being closed. This can be
1175  * used inside e.g. a flush implementation to see if the
1176  * flush (or other i/o operation) is called from within
1177  * the closing operation.
1178  *
1179  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1180  *
1181  * Since: 2.24
1182  **/
1183 gboolean
1184 g_output_stream_is_closing (GOutputStream *stream)
1185 {
1186   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1187
1188   return stream->priv->closing;
1189 }
1190
1191 /**
1192  * g_output_stream_has_pending:
1193  * @stream: a #GOutputStream.
1194  * 
1195  * Checks if an ouput stream has pending actions.
1196  * 
1197  * Returns: %TRUE if @stream has pending actions. 
1198  **/
1199 gboolean
1200 g_output_stream_has_pending (GOutputStream *stream)
1201 {
1202   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1203   
1204   return stream->priv->pending;
1205 }
1206
1207 /**
1208  * g_output_stream_set_pending:
1209  * @stream: a #GOutputStream.
1210  * @error: a #GError location to store the error occurring, or %NULL to 
1211  * ignore.
1212  * 
1213  * Sets @stream to have actions pending. If the pending flag is
1214  * already set or @stream is closed, it will return %FALSE and set
1215  * @error.
1216  *
1217  * Return value: %TRUE if pending was previously unset and is now set.
1218  **/
1219 gboolean
1220 g_output_stream_set_pending (GOutputStream *stream,
1221                              GError **error)
1222 {
1223   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1224   
1225   if (stream->priv->closed)
1226     {
1227       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1228                            _("Stream is already closed"));
1229       return FALSE;
1230     }
1231   
1232   if (stream->priv->pending)
1233     {
1234       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1235                            /* Translators: This is an error you get if there is
1236                             * already an operation running against this stream when
1237                             * you try to start one */
1238                            _("Stream has outstanding operation"));
1239       return FALSE;
1240     }
1241   
1242   stream->priv->pending = TRUE;
1243   return TRUE;
1244 }
1245
1246 /**
1247  * g_output_stream_clear_pending:
1248  * @stream: output stream
1249  * 
1250  * Clears the pending flag on @stream.
1251  **/
1252 void
1253 g_output_stream_clear_pending (GOutputStream *stream)
1254 {
1255   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1256   
1257   stream->priv->pending = FALSE;
1258 }
1259
1260
1261 /********************************************
1262  *   Default implementation of async ops    *
1263  ********************************************/
1264
1265 typedef struct {
1266   const void         *buffer;
1267   gsize               count_requested;
1268   gssize              count_written;
1269
1270   GCancellable       *cancellable;
1271   gint                io_priority;
1272   gboolean            need_idle;
1273 } WriteData;
1274
1275 static void
1276 free_write_data (WriteData *op)
1277 {
1278   if (op->cancellable)
1279     g_object_unref (op->cancellable);
1280   g_slice_free (WriteData, op);
1281 }
1282
1283 static void
1284 write_async_thread (GSimpleAsyncResult *res,
1285                     GObject            *object,
1286                     GCancellable       *cancellable)
1287 {
1288   WriteData *op;
1289   GOutputStreamClass *class;
1290   GError *error = NULL;
1291
1292   class = G_OUTPUT_STREAM_GET_CLASS (object);
1293   op = g_simple_async_result_get_op_res_gpointer (res);
1294   op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested,
1295                                        cancellable, &error);
1296   if (op->count_written == -1)
1297     g_simple_async_result_take_error (res, error);
1298 }
1299
1300 static void write_async_pollable (GPollableOutputStream *stream,
1301                                   GSimpleAsyncResult    *result);
1302
1303 static gboolean
1304 write_async_pollable_ready (GPollableOutputStream *stream,
1305                             gpointer               user_data)
1306 {
1307   GSimpleAsyncResult *result = user_data;
1308
1309   write_async_pollable (stream, result);
1310   return FALSE;
1311 }
1312
1313 static void
1314 write_async_pollable (GPollableOutputStream *stream,
1315                       GSimpleAsyncResult    *result)
1316 {
1317   GError *error = NULL;
1318   WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
1319
1320   if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
1321     op->count_written = -1;
1322   else
1323     {
1324       op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1325         write_nonblocking (stream, op->buffer, op->count_requested, &error);
1326     }
1327
1328   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1329     {
1330       GSource *source;
1331
1332       g_error_free (error);
1333       op->need_idle = FALSE;
1334
1335       source = g_pollable_output_stream_create_source (stream, op->cancellable);
1336       g_source_set_callback (source,
1337                              (GSourceFunc) write_async_pollable_ready,
1338                              g_object_ref (result), g_object_unref);
1339       g_source_set_priority (source, op->io_priority);
1340       g_source_attach (source, g_main_context_get_thread_default ());
1341       g_source_unref (source);
1342       return;
1343     }
1344
1345   if (op->count_written == -1)
1346     g_simple_async_result_take_error (result, error);
1347
1348   if (op->need_idle)
1349     g_simple_async_result_complete_in_idle (result);
1350   else
1351     g_simple_async_result_complete (result);
1352 }
1353
1354 static void
1355 g_output_stream_real_write_async (GOutputStream       *stream,
1356                                   const void          *buffer,
1357                                   gsize                count,
1358                                   int                  io_priority,
1359                                   GCancellable        *cancellable,
1360                                   GAsyncReadyCallback  callback,
1361                                   gpointer             user_data)
1362 {
1363   GSimpleAsyncResult *res;
1364   WriteData *op;
1365
1366   op = g_slice_new0 (WriteData);
1367   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1368   g_simple_async_result_set_op_res_gpointer (res, op, (GDestroyNotify) free_write_data);
1369   op->buffer = buffer;
1370   op->count_requested = count;
1371   op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
1372   op->io_priority = io_priority;
1373   op->need_idle = TRUE;
1374   
1375   if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1376       g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1377     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
1378   else
1379     g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
1380   g_object_unref (res);
1381 }
1382
1383 static gssize
1384 g_output_stream_real_write_finish (GOutputStream  *stream,
1385                                    GAsyncResult   *result,
1386                                    GError        **error)
1387 {
1388   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1389   WriteData *op;
1390
1391   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async);
1392   op = g_simple_async_result_get_op_res_gpointer (simple);
1393   return op->count_written;
1394 }
1395
1396 typedef struct {
1397   GInputStream *source;
1398   GOutputStreamSpliceFlags flags;
1399   gssize bytes_copied;
1400 } SpliceData;
1401
1402 static void
1403 splice_async_thread (GSimpleAsyncResult *result,
1404                      GObject            *object,
1405                      GCancellable       *cancellable)
1406 {
1407   SpliceData *op;
1408   GOutputStreamClass *class;
1409   GError *error = NULL;
1410   GOutputStream *stream;
1411
1412   stream = G_OUTPUT_STREAM (object);
1413   class = G_OUTPUT_STREAM_GET_CLASS (object);
1414   op = g_simple_async_result_get_op_res_gpointer (result);
1415   
1416   op->bytes_copied = class->splice (stream,
1417                                     op->source,
1418                                     op->flags,
1419                                     cancellable,
1420                                     &error);
1421   if (op->bytes_copied == -1)
1422     g_simple_async_result_take_error (result, error);
1423 }
1424
1425 static void
1426 g_output_stream_real_splice_async (GOutputStream             *stream,
1427                                    GInputStream              *source,
1428                                    GOutputStreamSpliceFlags   flags,
1429                                    int                        io_priority,
1430                                    GCancellable              *cancellable,
1431                                    GAsyncReadyCallback        callback,
1432                                    gpointer                   user_data)
1433 {
1434   GSimpleAsyncResult *res;
1435   SpliceData *op;
1436
1437   op = g_new0 (SpliceData, 1);
1438   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async);
1439   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1440   op->flags = flags;
1441   op->source = source;
1442
1443   /* TODO: In the case where both source and destintion have
1444      non-threadbased async calls we can use a true async copy here */
1445   
1446   g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable);
1447   g_object_unref (res);
1448 }
1449
1450 static gssize
1451 g_output_stream_real_splice_finish (GOutputStream  *stream,
1452                                     GAsyncResult   *result,
1453                                     GError        **error)
1454 {
1455   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1456   SpliceData *op;
1457
1458   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async);
1459   op = g_simple_async_result_get_op_res_gpointer (simple);
1460   return op->bytes_copied;
1461 }
1462
1463
1464 static void
1465 flush_async_thread (GSimpleAsyncResult *res,
1466                     GObject            *object,
1467                     GCancellable       *cancellable)
1468 {
1469   GOutputStreamClass *class;
1470   gboolean result;
1471   GError *error = NULL;
1472
1473   class = G_OUTPUT_STREAM_GET_CLASS (object);
1474   result = TRUE;
1475   if (class->flush)
1476     result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1477
1478   if (!result)
1479     g_simple_async_result_take_error (res, error);
1480 }
1481
1482 static void
1483 g_output_stream_real_flush_async (GOutputStream       *stream,
1484                                   int                  io_priority,
1485                                   GCancellable        *cancellable,
1486                                   GAsyncReadyCallback  callback,
1487                                   gpointer             user_data)
1488 {
1489   GSimpleAsyncResult *res;
1490
1491   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1492   
1493   g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable);
1494   g_object_unref (res);
1495 }
1496
1497 static gboolean
1498 g_output_stream_real_flush_finish (GOutputStream  *stream,
1499                                    GAsyncResult   *result,
1500                                    GError        **error)
1501 {
1502   return TRUE;
1503 }
1504
1505 static void
1506 close_async_thread (GSimpleAsyncResult *res,
1507                     GObject            *object,
1508                     GCancellable       *cancellable)
1509 {
1510   GOutputStreamClass *class;
1511   GError *error = NULL;
1512   gboolean result = TRUE;
1513
1514   class = G_OUTPUT_STREAM_GET_CLASS (object);
1515
1516   /* Do a flush here if there is a flush function, and we did not have to do
1517      an async flush before (see g_output_stream_close_async) */
1518   if (class->flush != NULL &&
1519       (class->flush_async == NULL ||
1520        class->flush_async == g_output_stream_real_flush_async))
1521     {
1522       result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1523     }
1524
1525   /* Auto handling of cancelation disabled, and ignore
1526      cancellation, since we want to close things anyway, although
1527      possibly in a quick-n-dirty way. At least we never want to leak
1528      open handles */
1529   
1530   if (class->close_fn)
1531     {
1532       /* Make sure to close, even if the flush failed (see sync close) */
1533       if (!result)
1534         class->close_fn (G_OUTPUT_STREAM (object), cancellable, NULL);
1535       else
1536         result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error);
1537
1538       if (!result)
1539         g_simple_async_result_take_error (res, error);
1540     }
1541 }
1542
1543 static void
1544 g_output_stream_real_close_async (GOutputStream       *stream,
1545                                   int                  io_priority,
1546                                   GCancellable        *cancellable,
1547                                   GAsyncReadyCallback  callback,
1548                                   gpointer             user_data)
1549 {
1550   GSimpleAsyncResult *res;
1551   
1552   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async);
1553
1554   g_simple_async_result_set_handle_cancellation (res, FALSE);
1555   
1556   g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable);
1557   g_object_unref (res);
1558 }
1559
1560 static gboolean
1561 g_output_stream_real_close_finish (GOutputStream  *stream,
1562                                    GAsyncResult   *result,
1563                                    GError        **error)
1564 {
1565   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1566   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async);
1567   return TRUE;
1568 }