gio: add GBytes-based input/output stream methods
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gsimpleasyncresult.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
49
50 struct _GOutputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   guint closing : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101
102 static void
103 g_output_stream_finalize (GObject *object)
104 {
105   G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object);
106 }
107
108 static void
109 g_output_stream_dispose (GObject *object)
110 {
111   GOutputStream *stream;
112
113   stream = G_OUTPUT_STREAM (object);
114   
115   if (!stream->priv->closed)
116     g_output_stream_close (stream, NULL, NULL);
117
118   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
119 }
120
121 static void
122 g_output_stream_class_init (GOutputStreamClass *klass)
123 {
124   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
125   
126   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
127   
128   gobject_class->finalize = g_output_stream_finalize;
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
147                                               G_TYPE_OUTPUT_STREAM,
148                                               GOutputStreamPrivate);
149 }
150
151 /**
152  * g_output_stream_write:
153  * @stream: a #GOutputStream.
154  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
155  * @count: the number of bytes to write
156  * @cancellable: (allow-none): optional cancellable object
157  * @error: location to store the error occurring, or %NULL to ignore
158  *
159  * Tries to write @count bytes from @buffer into the stream. Will block
160  * during the operation.
161  * 
162  * If count is 0, returns 0 and does nothing. A value of @count
163  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
164  *
165  * On success, the number of bytes written to the stream is returned.
166  * It is not an error if this is not the same as the requested size, as it
167  * can happen e.g. on a partial I/O error, or if there is not enough
168  * storage in the stream. All writes block until at least one byte
169  * is written or an error occurs; 0 is never returned (unless
170  * @count is 0).
171  * 
172  * If @cancellable is not %NULL, then the operation can be cancelled by
173  * triggering the cancellable object from another thread. If the operation
174  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
175  * operation was partially finished when the operation was cancelled the
176  * partial result will be returned, without an error.
177  *
178  * On error -1 is returned and @error is set accordingly.
179  * 
180  * Virtual: write_fn
181  *
182  * Return value: Number of bytes written, or -1 on error
183  **/
184 gssize
185 g_output_stream_write (GOutputStream  *stream,
186                        const void     *buffer,
187                        gsize           count,
188                        GCancellable   *cancellable,
189                        GError        **error)
190 {
191   GOutputStreamClass *class;
192   gssize res;
193
194   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
195   g_return_val_if_fail (buffer != NULL, 0);
196
197   if (count == 0)
198     return 0;
199   
200   if (((gssize) count) < 0)
201     {
202       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
203                    _("Too large count value passed to %s"), G_STRFUNC);
204       return -1;
205     }
206
207   class = G_OUTPUT_STREAM_GET_CLASS (stream);
208
209   if (class->write_fn == NULL) 
210     {
211       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
212                            _("Output stream doesn't implement write"));
213       return -1;
214     }
215   
216   if (!g_output_stream_set_pending (stream, error))
217     return -1;
218   
219   if (cancellable)
220     g_cancellable_push_current (cancellable);
221   
222   res = class->write_fn (stream, buffer, count, cancellable, error);
223   
224   if (cancellable)
225     g_cancellable_pop_current (cancellable);
226   
227   g_output_stream_clear_pending (stream);
228
229   return res; 
230 }
231
232 /**
233  * g_output_stream_write_all:
234  * @stream: a #GOutputStream.
235  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
236  * @count: the number of bytes to write
237  * @bytes_written: (out): location to store the number of bytes that was 
238  *     written to the stream
239  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
240  * @error: location to store the error occurring, or %NULL to ignore
241  *
242  * Tries to write @count bytes from @buffer into the stream. Will block
243  * during the operation.
244  * 
245  * This function is similar to g_output_stream_write(), except it tries to
246  * write as many bytes as requested, only stopping on an error.
247  *
248  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
249  * is set to @count.
250  * 
251  * If there is an error during the operation %FALSE is returned and @error
252  * is set to indicate the error status, @bytes_written is updated to contain
253  * the number of bytes written into the stream before the error occurred.
254  *
255  * Return value: %TRUE on success, %FALSE if there was an error
256  **/
257 gboolean
258 g_output_stream_write_all (GOutputStream  *stream,
259                            const void     *buffer,
260                            gsize           count,
261                            gsize          *bytes_written,
262                            GCancellable   *cancellable,
263                            GError        **error)
264 {
265   gsize _bytes_written;
266   gssize res;
267
268   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
269   g_return_val_if_fail (buffer != NULL, FALSE);
270
271   _bytes_written = 0;
272   while (_bytes_written < count)
273     {
274       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
275                                    cancellable, error);
276       if (res == -1)
277         {
278           if (bytes_written)
279             *bytes_written = _bytes_written;
280           return FALSE;
281         }
282       
283       if (res == 0)
284         g_warning ("Write returned zero without error");
285
286       _bytes_written += res;
287     }
288   
289   if (bytes_written)
290     *bytes_written = _bytes_written;
291
292   return TRUE;
293 }
294
295 /**
296  * g_output_stream_write_bytes:
297  * @stream: a #GOutputStream.
298  * @bytes: the #GBytes to write
299  * @cancellable: (allow-none): optional cancellable object
300  * @error: location to store the error occurring, or %NULL to ignore
301  *
302  * Tries to write the data from @bytes into the stream. Will block
303  * during the operation.
304  *
305  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
306  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
307  *
308  * On success, the number of bytes written to the stream is returned.
309  * It is not an error if this is not the same as the requested size, as it
310  * can happen e.g. on a partial I/O error, or if there is not enough
311  * storage in the stream. All writes block until at least one byte
312  * is written or an error occurs; 0 is never returned (unless
313  * the size of @bytes is 0).
314  *
315  * If @cancellable is not %NULL, then the operation can be cancelled by
316  * triggering the cancellable object from another thread. If the operation
317  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
318  * operation was partially finished when the operation was cancelled the
319  * partial result will be returned, without an error.
320  *
321  * On error -1 is returned and @error is set accordingly.
322  *
323  * Return value: Number of bytes written, or -1 on error
324  **/
325 gssize
326 g_output_stream_write_bytes (GOutputStream  *stream,
327                              GBytes         *bytes,
328                              GCancellable   *cancellable,
329                              GError        **error)
330 {
331   gsize size;
332   gconstpointer data;
333
334   data = g_bytes_get_data (bytes, &size);
335
336   return g_output_stream_write (stream,
337                                 data, size,
338                                 cancellable,
339                                 error);
340 }
341
342 /**
343  * g_output_stream_flush:
344  * @stream: a #GOutputStream.
345  * @cancellable: (allow-none): optional cancellable object
346  * @error: location to store the error occurring, or %NULL to ignore
347  *
348  * Forces a write of all user-space buffered data for the given
349  * @stream. Will block during the operation. Closing the stream will
350  * implicitly cause a flush.
351  *
352  * This function is optional for inherited classes.
353  * 
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
357  *
358  * Return value: %TRUE on success, %FALSE on error
359  **/
360 gboolean
361 g_output_stream_flush (GOutputStream  *stream,
362                        GCancellable   *cancellable,
363                        GError        **error)
364 {
365   GOutputStreamClass *class;
366   gboolean res;
367
368   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
369
370   if (!g_output_stream_set_pending (stream, error))
371     return FALSE;
372   
373   class = G_OUTPUT_STREAM_GET_CLASS (stream);
374
375   res = TRUE;
376   if (class->flush)
377     {
378       if (cancellable)
379         g_cancellable_push_current (cancellable);
380       
381       res = class->flush (stream, cancellable, error);
382       
383       if (cancellable)
384         g_cancellable_pop_current (cancellable);
385     }
386   
387   g_output_stream_clear_pending (stream);
388
389   return res;
390 }
391
392 /**
393  * g_output_stream_splice:
394  * @stream: a #GOutputStream.
395  * @source: a #GInputStream.
396  * @flags: a set of #GOutputStreamSpliceFlags.
397  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
398  * @error: a #GError location to store the error occurring, or %NULL to
399  * ignore.
400  *
401  * Splices an input stream into an output stream.
402  *
403  * Returns: a #gssize containing the size of the data spliced, or
404  *     -1 if an error occurred. Note that if the number of bytes
405  *     spliced is greater than %G_MAXSSIZE, then that will be
406  *     returned, and there is no way to determine the actual number
407  *     of bytes spliced.
408  **/
409 gssize
410 g_output_stream_splice (GOutputStream             *stream,
411                         GInputStream              *source,
412                         GOutputStreamSpliceFlags   flags,
413                         GCancellable              *cancellable,
414                         GError                   **error)
415 {
416   GOutputStreamClass *class;
417   gssize bytes_copied;
418
419   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
420   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
421
422   if (g_input_stream_is_closed (source))
423     {
424       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
425                            _("Source stream is already closed"));
426       return -1;
427     }
428
429   if (!g_output_stream_set_pending (stream, error))
430     return -1;
431
432   class = G_OUTPUT_STREAM_GET_CLASS (stream);
433
434   if (cancellable)
435     g_cancellable_push_current (cancellable);
436
437   bytes_copied = class->splice (stream, source, flags, cancellable, error);
438
439   if (cancellable)
440     g_cancellable_pop_current (cancellable);
441
442   g_output_stream_clear_pending (stream);
443
444   return bytes_copied;
445 }
446
447 static gssize
448 g_output_stream_real_splice (GOutputStream             *stream,
449                              GInputStream              *source,
450                              GOutputStreamSpliceFlags   flags,
451                              GCancellable              *cancellable,
452                              GError                   **error)
453 {
454   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
455   gssize n_read, n_written;
456   gsize bytes_copied;
457   char buffer[8192], *p;
458   gboolean res;
459
460   bytes_copied = 0;
461   if (class->write_fn == NULL)
462     {
463       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
464                            _("Output stream doesn't implement write"));
465       res = FALSE;
466       goto notsupported;
467     }
468
469   res = TRUE;
470   do
471     {
472       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
473       if (n_read == -1)
474         {
475           res = FALSE;
476           break;
477         }
478
479       if (n_read == 0)
480         break;
481
482       p = buffer;
483       while (n_read > 0)
484         {
485           n_written = class->write_fn (stream, p, n_read, cancellable, error);
486           if (n_written == -1)
487             {
488               res = FALSE;
489               break;
490             }
491
492           p += n_written;
493           n_read -= n_written;
494           bytes_copied += n_written;
495         }
496
497       if (bytes_copied > G_MAXSSIZE)
498         bytes_copied = G_MAXSSIZE;
499     }
500   while (res);
501
502  notsupported:
503   if (!res)
504     error = NULL; /* Ignore further errors */
505
506   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
507     {
508       /* Don't care about errors in source here */
509       g_input_stream_close (source, cancellable, NULL);
510     }
511
512   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
513     {
514       /* But write errors on close are bad! */
515       res = _g_output_stream_close_internal (stream, cancellable, error);
516     }
517
518   if (res)
519     return bytes_copied;
520
521   return -1;
522 }
523
524 /* Must always be called inside
525  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
526 static gboolean
527 _g_output_stream_close_internal (GOutputStream  *stream,
528                                  GCancellable   *cancellable,
529                                  GError        **error)
530 {
531   GOutputStreamClass *class;
532   gboolean res;
533
534   if (stream->priv->closed)
535     return TRUE;
536
537   class = G_OUTPUT_STREAM_GET_CLASS (stream);
538
539   stream->priv->closing = TRUE;
540
541   if (cancellable)
542     g_cancellable_push_current (cancellable);
543
544   if (class->flush)
545     res = class->flush (stream, cancellable, error);
546   else
547     res = TRUE;
548
549   if (!res)
550     {
551       /* flushing caused the error that we want to return,
552        * but we still want to close the underlying stream if possible
553        */
554       if (class->close_fn)
555         class->close_fn (stream, cancellable, NULL);
556     }
557   else
558     {
559       res = TRUE;
560       if (class->close_fn)
561         res = class->close_fn (stream, cancellable, error);
562     }
563
564   if (cancellable)
565     g_cancellable_pop_current (cancellable);
566
567   stream->priv->closing = FALSE;
568   stream->priv->closed = TRUE;
569
570   return res;
571 }
572
573 /**
574  * g_output_stream_close:
575  * @stream: A #GOutputStream.
576  * @cancellable: (allow-none): optional cancellable object
577  * @error: location to store the error occurring, or %NULL to ignore
578  *
579  * Closes the stream, releasing resources related to it.
580  *
581  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
582  * Closing a stream multiple times will not return an error.
583  *
584  * Closing a stream will automatically flush any outstanding buffers in the
585  * stream.
586  *
587  * Streams will be automatically closed when the last reference
588  * is dropped, but you might want to call this function to make sure 
589  * resources are released as early as possible.
590  *
591  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
592  * open after the stream is closed. See the documentation for the individual
593  * stream for details.
594  *
595  * On failure the first error that happened will be reported, but the close
596  * operation will finish as much as possible. A stream that failed to
597  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
598  * is important to check and report the error to the user, otherwise
599  * there might be a loss of data as all data might not be written.
600  * 
601  * If @cancellable is not %NULL, then the operation can be cancelled by
602  * triggering the cancellable object from another thread. If the operation
603  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
604  * Cancelling a close will still leave the stream closed, but there some streams
605  * can use a faster close that doesn't block to e.g. check errors. On
606  * cancellation (as with any error) there is no guarantee that all written
607  * data will reach the target. 
608  *
609  * Return value: %TRUE on success, %FALSE on failure
610  **/
611 gboolean
612 g_output_stream_close (GOutputStream  *stream,
613                        GCancellable   *cancellable,
614                        GError        **error)
615 {
616   gboolean res;
617
618   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
619
620   if (stream->priv->closed)
621     return TRUE;
622
623   if (!g_output_stream_set_pending (stream, error))
624     return FALSE;
625
626   res = _g_output_stream_close_internal (stream, cancellable, error);
627
628   g_output_stream_clear_pending (stream);
629   
630   return res;
631 }
632
633 static void
634 async_ready_callback_wrapper (GObject      *source_object,
635                               GAsyncResult *res,
636                               gpointer      user_data)
637 {
638   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
639
640   g_output_stream_clear_pending (stream);
641   if (stream->priv->outstanding_callback)
642     (*stream->priv->outstanding_callback) (source_object, res, user_data);
643   g_object_unref (stream);
644 }
645
646 typedef struct {
647   gint io_priority;
648   GCancellable *cancellable;
649   GError *flush_error;
650   gpointer user_data;
651 } CloseUserData;
652
653 static void
654 async_ready_close_callback_wrapper (GObject      *source_object,
655                                     GAsyncResult *res,
656                                     gpointer      user_data)
657 {
658   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
659   CloseUserData *data = user_data;
660
661   stream->priv->closing = FALSE;
662   stream->priv->closed = TRUE;
663
664   g_output_stream_clear_pending (stream);
665
666   if (stream->priv->outstanding_callback)
667     {
668       if (data->flush_error != NULL)
669         {
670           GSimpleAsyncResult *err;
671
672           err = g_simple_async_result_new_take_error (source_object,
673                                                       stream->priv->outstanding_callback,
674                                                       data->user_data,
675                                                       data->flush_error);
676           data->flush_error = NULL;
677
678           (*stream->priv->outstanding_callback) (source_object,
679                                                  G_ASYNC_RESULT (err),
680                                                  data->user_data);
681           g_object_unref (err);
682         }
683       else
684         {
685           (*stream->priv->outstanding_callback) (source_object,
686                                                  res,
687                                                  data->user_data);
688         }
689     }
690
691   g_object_unref (stream);
692
693   if (data->cancellable)
694     g_object_unref (data->cancellable);
695
696   if (data->flush_error)
697     g_error_free (data->flush_error);
698
699   g_slice_free (CloseUserData, data);
700 }
701
702 static void
703 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
704                                             GAsyncResult *res,
705                                             gpointer      user_data)
706 {
707   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
708   GOutputStreamClass *class;
709   CloseUserData *data = user_data;
710   GSimpleAsyncResult *simple;
711
712   /* propagate the possible error */
713   if (G_IS_SIMPLE_ASYNC_RESULT (res))
714     {
715       simple = G_SIMPLE_ASYNC_RESULT (res);
716       g_simple_async_result_propagate_error (simple, &data->flush_error);
717     }
718
719   class = G_OUTPUT_STREAM_GET_CLASS (stream);
720
721   /* we still close, even if there was a flush error */
722   class->close_async (stream, data->io_priority, data->cancellable,
723                       async_ready_close_callback_wrapper, user_data);
724 }
725
726 /**
727  * g_output_stream_write_async:
728  * @stream: A #GOutputStream.
729  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
730  * @count: the number of bytes to write
731  * @io_priority: the io priority of the request.
732  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
733  * @callback: (scope async): callback to call when the request is satisfied
734  * @user_data: (closure): the data to pass to callback function
735  *
736  * Request an asynchronous write of @count bytes from @buffer into 
737  * the stream. When the operation is finished @callback will be called.
738  * You can then call g_output_stream_write_finish() to get the result of the 
739  * operation.
740  *
741  * During an async request no other sync and async calls are allowed, 
742  * and will result in %G_IO_ERROR_PENDING errors. 
743  *
744  * A value of @count larger than %G_MAXSSIZE will cause a 
745  * %G_IO_ERROR_INVALID_ARGUMENT error.
746  *
747  * On success, the number of bytes written will be passed to the
748  * @callback. It is not an error if this is not the same as the 
749  * requested size, as it can happen e.g. on a partial I/O error, 
750  * but generally we try to write as many bytes as requested. 
751  *
752  * You are guaranteed that this method will never fail with
753  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
754  * method will just wait until this changes.
755  *
756  * Any outstanding I/O request with higher priority (lower numerical 
757  * value) will be executed before an outstanding request with lower 
758  * priority. Default priority is %G_PRIORITY_DEFAULT.
759  *
760  * The asyncronous methods have a default fallback that uses threads 
761  * to implement asynchronicity, so they are optional for inheriting 
762  * classes. However, if you override one you must override all.
763  *
764  * For the synchronous, blocking version of this function, see 
765  * g_output_stream_write().
766  **/
767 void
768 g_output_stream_write_async (GOutputStream       *stream,
769                              const void          *buffer,
770                              gsize                count,
771                              int                  io_priority,
772                              GCancellable        *cancellable,
773                              GAsyncReadyCallback  callback,
774                              gpointer             user_data)
775 {
776   GOutputStreamClass *class;
777   GSimpleAsyncResult *simple;
778   GError *error = NULL;
779
780   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
781   g_return_if_fail (buffer != NULL);
782
783   if (count == 0)
784     {
785       simple = g_simple_async_result_new (G_OBJECT (stream),
786                                           callback,
787                                           user_data,
788                                           g_output_stream_write_async);
789       g_simple_async_result_complete_in_idle (simple);
790       g_object_unref (simple);
791       return;
792     }
793
794   if (((gssize) count) < 0)
795     {
796       g_simple_async_report_error_in_idle (G_OBJECT (stream),
797                                            callback,
798                                            user_data,
799                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
800                                            _("Too large count value passed to %s"),
801                                            G_STRFUNC);
802       return;
803     }
804
805   if (!g_output_stream_set_pending (stream, &error))
806     {
807       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
808                                             callback,
809                                             user_data,
810                                             error);
811       return;
812     }
813   
814   class = G_OUTPUT_STREAM_GET_CLASS (stream);
815
816   stream->priv->outstanding_callback = callback;
817   g_object_ref (stream);
818   class->write_async (stream, buffer, count, io_priority, cancellable,
819                       async_ready_callback_wrapper, user_data);
820 }
821
822 /**
823  * g_output_stream_write_finish:
824  * @stream: a #GOutputStream.
825  * @result: a #GAsyncResult.
826  * @error: a #GError location to store the error occurring, or %NULL to 
827  * ignore.
828  * 
829  * Finishes a stream write operation.
830  * 
831  * Returns: a #gssize containing the number of bytes written to the stream.
832  **/
833 gssize
834 g_output_stream_write_finish (GOutputStream  *stream,
835                               GAsyncResult   *result,
836                               GError        **error)
837 {
838   GSimpleAsyncResult *simple;
839   GOutputStreamClass *class;
840
841   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
842   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
843
844   if (G_IS_SIMPLE_ASYNC_RESULT (result))
845     {
846       simple = G_SIMPLE_ASYNC_RESULT (result);
847       if (g_simple_async_result_propagate_error (simple, error))
848         return -1;
849
850       /* Special case writes of 0 bytes */
851       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async)
852         return 0;
853     }
854   
855   class = G_OUTPUT_STREAM_GET_CLASS (stream);
856   return class->write_finish (stream, result, error);
857 }
858
859 static void
860 write_bytes_callback (GObject      *stream,
861                       GAsyncResult *result,
862                       gpointer      user_data)
863 {
864   GSimpleAsyncResult *simple = user_data;
865   GError *error = NULL;
866   gssize nwrote;
867
868   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
869                                          result, &error);
870   if (nwrote == -1)
871     g_simple_async_result_take_error (simple, error);
872   else
873     g_simple_async_result_set_op_res_gssize (simple, nwrote);
874   g_simple_async_result_complete (simple);
875   g_object_unref (simple);
876 }
877
878 /**
879  * g_output_stream_write_bytes_async:
880  * @stream: A #GOutputStream.
881  * @bytes: The bytes to write
882  * @io_priority: the io priority of the request.
883  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
884  * @callback: (scope async): callback to call when the request is satisfied
885  * @user_data: (closure): the data to pass to callback function
886  *
887  * Request an asynchronous write of the data in @bytes to the stream.
888  * When the operation is finished @callback will be called. You can
889  * then call g_output_stream_write_bytes_finish() to get the result of
890  * the operation.
891  *
892  * During an async request no other sync and async calls are allowed,
893  * and will result in %G_IO_ERROR_PENDING errors.
894  *
895  * A #GBytes larger than %G_MAXSSIZE will cause a
896  * %G_IO_ERROR_INVALID_ARGUMENT error.
897  *
898  * On success, the number of bytes written will be passed to the
899  * @callback. It is not an error if this is not the same as the
900  * requested size, as it can happen e.g. on a partial I/O error,
901  * but generally we try to write as many bytes as requested.
902  *
903  * You are guaranteed that this method will never fail with
904  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
905  * method will just wait until this changes.
906  *
907  * Any outstanding I/O request with higher priority (lower numerical
908  * value) will be executed before an outstanding request with lower
909  * priority. Default priority is %G_PRIORITY_DEFAULT.
910  *
911  * For the synchronous, blocking version of this function, see
912  * g_output_stream_write_bytes().
913  **/
914 void
915 g_output_stream_write_bytes_async (GOutputStream       *stream,
916                                    GBytes              *bytes,
917                                    int                  io_priority,
918                                    GCancellable        *cancellable,
919                                    GAsyncReadyCallback  callback,
920                                    gpointer             user_data)
921 {
922   GSimpleAsyncResult *simple;
923   gsize size;
924   gconstpointer data;
925
926   data = g_bytes_get_data (bytes, &size);
927
928   simple = g_simple_async_result_new (G_OBJECT (stream),
929                                       callback, user_data,
930                                       g_output_stream_write_bytes_async);
931   g_simple_async_result_set_op_res_gpointer (simple, g_bytes_ref (bytes),
932                                              (GDestroyNotify) g_bytes_unref);
933
934   g_output_stream_write_async (stream,
935                                data, size,
936                                io_priority,
937                                cancellable,
938                                write_bytes_callback,
939                                simple);
940 }
941
942 /**
943  * g_output_stream_write_bytes_finish:
944  * @stream: a #GOutputStream.
945  * @result: a #GAsyncResult.
946  * @error: a #GError location to store the error occurring, or %NULL to
947  * ignore.
948  *
949  * Finishes a stream write-from-#GBytes operation.
950  *
951  * Returns: a #gssize containing the number of bytes written to the stream.
952  **/
953 gssize
954 g_output_stream_write_bytes_finish (GOutputStream  *stream,
955                                     GAsyncResult   *result,
956                                     GError        **error)
957 {
958   GSimpleAsyncResult *simple;
959
960   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
961   g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (stream), g_output_stream_write_bytes_async), -1);
962
963   simple = G_SIMPLE_ASYNC_RESULT (result);
964   if (g_simple_async_result_propagate_error (simple, error))
965     return -1;
966   return g_simple_async_result_get_op_res_gssize (simple);
967 }
968
969 typedef struct {
970   GInputStream *source;
971   gpointer user_data;
972   GAsyncReadyCallback callback;
973 } SpliceUserData;
974
975 static void
976 async_ready_splice_callback_wrapper (GObject      *source_object,
977                                      GAsyncResult *res,
978                                      gpointer     _data)
979 {
980   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
981   SpliceUserData *data = _data;
982   
983   g_output_stream_clear_pending (stream);
984   
985   if (data->callback)
986     (*data->callback) (source_object, res, data->user_data);
987   
988   g_object_unref (stream);
989   g_object_unref (data->source);
990   g_free (data);
991 }
992
993 /**
994  * g_output_stream_splice_async:
995  * @stream: a #GOutputStream.
996  * @source: a #GInputStream. 
997  * @flags: a set of #GOutputStreamSpliceFlags.
998  * @io_priority: the io priority of the request.
999  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
1000  * @callback: (scope async): a #GAsyncReadyCallback. 
1001  * @user_data: (closure): user data passed to @callback.
1002  * 
1003  * Splices a stream asynchronously.
1004  * When the operation is finished @callback will be called.
1005  * You can then call g_output_stream_splice_finish() to get the 
1006  * result of the operation.
1007  *
1008  * For the synchronous, blocking version of this function, see 
1009  * g_output_stream_splice().
1010  **/
1011 void
1012 g_output_stream_splice_async (GOutputStream            *stream,
1013                               GInputStream             *source,
1014                               GOutputStreamSpliceFlags  flags,
1015                               int                       io_priority,
1016                               GCancellable             *cancellable,
1017                               GAsyncReadyCallback       callback,
1018                               gpointer                  user_data)
1019 {
1020   GOutputStreamClass *class;
1021   SpliceUserData *data;
1022   GError *error = NULL;
1023
1024   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1025   g_return_if_fail (G_IS_INPUT_STREAM (source));
1026
1027   if (g_input_stream_is_closed (source))
1028     {
1029       g_simple_async_report_error_in_idle (G_OBJECT (stream),
1030                                            callback,
1031                                            user_data,
1032                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
1033                                            _("Source stream is already closed"));
1034       return;
1035     }
1036   
1037   if (!g_output_stream_set_pending (stream, &error))
1038     {
1039       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1040                                             callback,
1041                                             user_data,
1042                                             error);
1043       return;
1044     }
1045
1046   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1047
1048   data = g_new0 (SpliceUserData, 1);
1049   data->callback = callback;
1050   data->user_data = user_data;
1051   data->source = g_object_ref (source);
1052   
1053   g_object_ref (stream);
1054   class->splice_async (stream, source, flags, io_priority, cancellable,
1055                       async_ready_splice_callback_wrapper, data);
1056 }
1057
1058 /**
1059  * g_output_stream_splice_finish:
1060  * @stream: a #GOutputStream.
1061  * @result: a #GAsyncResult.
1062  * @error: a #GError location to store the error occurring, or %NULL to 
1063  * ignore.
1064  *
1065  * Finishes an asynchronous stream splice operation.
1066  * 
1067  * Returns: a #gssize of the number of bytes spliced. Note that if the
1068  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1069  *     will be returned, and there is no way to determine the actual
1070  *     number of bytes spliced.
1071  **/
1072 gssize
1073 g_output_stream_splice_finish (GOutputStream  *stream,
1074                                GAsyncResult   *result,
1075                                GError        **error)
1076 {
1077   GSimpleAsyncResult *simple;
1078   GOutputStreamClass *class;
1079
1080   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
1081   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1082
1083   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1084     {
1085       simple = G_SIMPLE_ASYNC_RESULT (result);
1086       if (g_simple_async_result_propagate_error (simple, error))
1087         return -1;
1088     }
1089   
1090   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1091   return class->splice_finish (stream, result, error);
1092 }
1093
1094 /**
1095  * g_output_stream_flush_async:
1096  * @stream: a #GOutputStream.
1097  * @io_priority: the io priority of the request.
1098  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1099  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1100  * @user_data: (closure): the data to pass to callback function
1101  * 
1102  * Forces an asynchronous write of all user-space buffered data for
1103  * the given @stream.
1104  * For behaviour details see g_output_stream_flush().
1105  *
1106  * When the operation is finished @callback will be 
1107  * called. You can then call g_output_stream_flush_finish() to get the 
1108  * result of the operation.
1109  **/
1110 void
1111 g_output_stream_flush_async (GOutputStream       *stream,
1112                              int                  io_priority,
1113                              GCancellable        *cancellable,
1114                              GAsyncReadyCallback  callback,
1115                              gpointer             user_data)
1116 {
1117   GOutputStreamClass *class;
1118   GSimpleAsyncResult *simple;
1119   GError *error = NULL;
1120
1121   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1122
1123   if (!g_output_stream_set_pending (stream, &error))
1124     {
1125       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1126                                             callback,
1127                                             user_data,
1128                                             error);
1129       return;
1130     }
1131
1132   stream->priv->outstanding_callback = callback;
1133   g_object_ref (stream);
1134
1135   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1136   
1137   if (class->flush_async == NULL)
1138     {
1139       simple = g_simple_async_result_new (G_OBJECT (stream),
1140                                           async_ready_callback_wrapper,
1141                                           user_data,
1142                                           g_output_stream_flush_async);
1143       g_simple_async_result_complete_in_idle (simple);
1144       g_object_unref (simple);
1145       return;
1146     }
1147       
1148   class->flush_async (stream, io_priority, cancellable,
1149                       async_ready_callback_wrapper, user_data);
1150 }
1151
1152 /**
1153  * g_output_stream_flush_finish:
1154  * @stream: a #GOutputStream.
1155  * @result: a GAsyncResult.
1156  * @error: a #GError location to store the error occurring, or %NULL to 
1157  * ignore.
1158  * 
1159  * Finishes flushing an output stream.
1160  * 
1161  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1162  **/
1163 gboolean
1164 g_output_stream_flush_finish (GOutputStream  *stream,
1165                               GAsyncResult   *result,
1166                               GError        **error)
1167 {
1168   GSimpleAsyncResult *simple;
1169   GOutputStreamClass *klass;
1170
1171   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1172   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1173
1174   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1175     {
1176       simple = G_SIMPLE_ASYNC_RESULT (result);
1177       if (g_simple_async_result_propagate_error (simple, error))
1178         return FALSE;
1179
1180       /* Special case default implementation */
1181       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async)
1182         return TRUE;
1183     }
1184
1185   klass = G_OUTPUT_STREAM_GET_CLASS (stream);
1186   return klass->flush_finish (stream, result, error);
1187 }
1188
1189
1190 /**
1191  * g_output_stream_close_async:
1192  * @stream: A #GOutputStream.
1193  * @io_priority: the io priority of the request.
1194  * @cancellable: (allow-none): optional cancellable object
1195  * @callback: (scope async): callback to call when the request is satisfied
1196  * @user_data: (closure): the data to pass to callback function
1197  *
1198  * Requests an asynchronous close of the stream, releasing resources 
1199  * related to it. When the operation is finished @callback will be 
1200  * called. You can then call g_output_stream_close_finish() to get 
1201  * the result of the operation.
1202  *
1203  * For behaviour details see g_output_stream_close().
1204  *
1205  * The asyncronous methods have a default fallback that uses threads 
1206  * to implement asynchronicity, so they are optional for inheriting 
1207  * classes. However, if you override one you must override all.
1208  **/
1209 void
1210 g_output_stream_close_async (GOutputStream       *stream,
1211                              int                  io_priority,
1212                              GCancellable        *cancellable,
1213                              GAsyncReadyCallback  callback,
1214                              gpointer             user_data)
1215 {
1216   GOutputStreamClass *class;
1217   GSimpleAsyncResult *simple;
1218   GError *error = NULL;
1219   CloseUserData *data;
1220
1221   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1222   
1223   if (stream->priv->closed)
1224     {
1225       simple = g_simple_async_result_new (G_OBJECT (stream),
1226                                           callback,
1227                                           user_data,
1228                                           g_output_stream_close_async);
1229       g_simple_async_result_complete_in_idle (simple);
1230       g_object_unref (simple);
1231       return;
1232     }
1233
1234   if (!g_output_stream_set_pending (stream, &error))
1235     {
1236       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1237                                             callback,
1238                                             user_data,
1239                                             error);
1240       return;
1241     }
1242   
1243   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1244   stream->priv->closing = TRUE;
1245   stream->priv->outstanding_callback = callback;
1246   g_object_ref (stream);
1247
1248   data = g_slice_new0 (CloseUserData);
1249
1250   if (cancellable != NULL)
1251     data->cancellable = g_object_ref (cancellable);
1252
1253   data->io_priority = io_priority;
1254   data->user_data = user_data;
1255
1256   /* Call close_async directly if there is no need to flush, or if the flush
1257      can be done sync (in the output stream async close thread) */
1258   if (class->flush_async == NULL ||
1259       (class->flush_async == g_output_stream_real_flush_async &&
1260        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1261     {
1262       class->close_async (stream, io_priority, cancellable,
1263                           async_ready_close_callback_wrapper, data);
1264     }
1265   else
1266     {
1267       /* First do an async flush, then do the async close in the callback
1268          wrapper (see async_ready_close_flushed_callback_wrapper) */
1269       class->flush_async (stream, io_priority, cancellable,
1270                           async_ready_close_flushed_callback_wrapper, data);
1271     }
1272 }
1273
1274 /**
1275  * g_output_stream_close_finish:
1276  * @stream: a #GOutputStream.
1277  * @result: a #GAsyncResult.
1278  * @error: a #GError location to store the error occurring, or %NULL to 
1279  * ignore.
1280  * 
1281  * Closes an output stream.
1282  * 
1283  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1284  **/
1285 gboolean
1286 g_output_stream_close_finish (GOutputStream  *stream,
1287                               GAsyncResult   *result,
1288                               GError        **error)
1289 {
1290   GSimpleAsyncResult *simple;
1291   GOutputStreamClass *class;
1292
1293   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1294   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1295
1296   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1297     {
1298       simple = G_SIMPLE_ASYNC_RESULT (result);
1299       if (g_simple_async_result_propagate_error (simple, error))
1300         return FALSE;
1301
1302       /* Special case already closed */
1303       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async)
1304         return TRUE;
1305     }
1306
1307   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1308   return class->close_finish (stream, result, error);
1309 }
1310
1311 /**
1312  * g_output_stream_is_closed:
1313  * @stream: a #GOutputStream.
1314  * 
1315  * Checks if an output stream has already been closed.
1316  * 
1317  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1318  **/
1319 gboolean
1320 g_output_stream_is_closed (GOutputStream *stream)
1321 {
1322   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1323   
1324   return stream->priv->closed;
1325 }
1326
1327 /**
1328  * g_output_stream_is_closing:
1329  * @stream: a #GOutputStream.
1330  *
1331  * Checks if an output stream is being closed. This can be
1332  * used inside e.g. a flush implementation to see if the
1333  * flush (or other i/o operation) is called from within
1334  * the closing operation.
1335  *
1336  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1337  *
1338  * Since: 2.24
1339  **/
1340 gboolean
1341 g_output_stream_is_closing (GOutputStream *stream)
1342 {
1343   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1344
1345   return stream->priv->closing;
1346 }
1347
1348 /**
1349  * g_output_stream_has_pending:
1350  * @stream: a #GOutputStream.
1351  * 
1352  * Checks if an ouput stream has pending actions.
1353  * 
1354  * Returns: %TRUE if @stream has pending actions. 
1355  **/
1356 gboolean
1357 g_output_stream_has_pending (GOutputStream *stream)
1358 {
1359   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1360   
1361   return stream->priv->pending;
1362 }
1363
1364 /**
1365  * g_output_stream_set_pending:
1366  * @stream: a #GOutputStream.
1367  * @error: a #GError location to store the error occurring, or %NULL to 
1368  * ignore.
1369  * 
1370  * Sets @stream to have actions pending. If the pending flag is
1371  * already set or @stream is closed, it will return %FALSE and set
1372  * @error.
1373  *
1374  * Return value: %TRUE if pending was previously unset and is now set.
1375  **/
1376 gboolean
1377 g_output_stream_set_pending (GOutputStream *stream,
1378                              GError **error)
1379 {
1380   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1381   
1382   if (stream->priv->closed)
1383     {
1384       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1385                            _("Stream is already closed"));
1386       return FALSE;
1387     }
1388   
1389   if (stream->priv->pending)
1390     {
1391       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1392                            /* Translators: This is an error you get if there is
1393                             * already an operation running against this stream when
1394                             * you try to start one */
1395                            _("Stream has outstanding operation"));
1396       return FALSE;
1397     }
1398   
1399   stream->priv->pending = TRUE;
1400   return TRUE;
1401 }
1402
1403 /**
1404  * g_output_stream_clear_pending:
1405  * @stream: output stream
1406  * 
1407  * Clears the pending flag on @stream.
1408  **/
1409 void
1410 g_output_stream_clear_pending (GOutputStream *stream)
1411 {
1412   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1413   
1414   stream->priv->pending = FALSE;
1415 }
1416
1417
1418 /********************************************
1419  *   Default implementation of async ops    *
1420  ********************************************/
1421
1422 typedef struct {
1423   const void         *buffer;
1424   gsize               count_requested;
1425   gssize              count_written;
1426
1427   GCancellable       *cancellable;
1428   gint                io_priority;
1429   gboolean            need_idle;
1430 } WriteData;
1431
1432 static void
1433 free_write_data (WriteData *op)
1434 {
1435   if (op->cancellable)
1436     g_object_unref (op->cancellable);
1437   g_slice_free (WriteData, op);
1438 }
1439
1440 static void
1441 write_async_thread (GSimpleAsyncResult *res,
1442                     GObject            *object,
1443                     GCancellable       *cancellable)
1444 {
1445   WriteData *op;
1446   GOutputStreamClass *class;
1447   GError *error = NULL;
1448
1449   class = G_OUTPUT_STREAM_GET_CLASS (object);
1450   op = g_simple_async_result_get_op_res_gpointer (res);
1451   op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested,
1452                                        cancellable, &error);
1453   if (op->count_written == -1)
1454     g_simple_async_result_take_error (res, error);
1455 }
1456
1457 static void write_async_pollable (GPollableOutputStream *stream,
1458                                   GSimpleAsyncResult    *result);
1459
1460 static gboolean
1461 write_async_pollable_ready (GPollableOutputStream *stream,
1462                             gpointer               user_data)
1463 {
1464   GSimpleAsyncResult *result = user_data;
1465
1466   write_async_pollable (stream, result);
1467   return FALSE;
1468 }
1469
1470 static void
1471 write_async_pollable (GPollableOutputStream *stream,
1472                       GSimpleAsyncResult    *result)
1473 {
1474   GError *error = NULL;
1475   WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
1476
1477   if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
1478     op->count_written = -1;
1479   else
1480     {
1481       op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1482         write_nonblocking (stream, op->buffer, op->count_requested, &error);
1483     }
1484
1485   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1486     {
1487       GSource *source;
1488
1489       g_error_free (error);
1490       op->need_idle = FALSE;
1491
1492       source = g_pollable_output_stream_create_source (stream, op->cancellable);
1493       g_source_set_callback (source,
1494                              (GSourceFunc) write_async_pollable_ready,
1495                              g_object_ref (result), g_object_unref);
1496       g_source_set_priority (source, op->io_priority);
1497       g_source_attach (source, g_main_context_get_thread_default ());
1498       g_source_unref (source);
1499       return;
1500     }
1501
1502   if (op->count_written == -1)
1503     g_simple_async_result_take_error (result, error);
1504
1505   if (op->need_idle)
1506     g_simple_async_result_complete_in_idle (result);
1507   else
1508     g_simple_async_result_complete (result);
1509 }
1510
1511 static void
1512 g_output_stream_real_write_async (GOutputStream       *stream,
1513                                   const void          *buffer,
1514                                   gsize                count,
1515                                   int                  io_priority,
1516                                   GCancellable        *cancellable,
1517                                   GAsyncReadyCallback  callback,
1518                                   gpointer             user_data)
1519 {
1520   GSimpleAsyncResult *res;
1521   WriteData *op;
1522
1523   op = g_slice_new0 (WriteData);
1524   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1525   g_simple_async_result_set_op_res_gpointer (res, op, (GDestroyNotify) free_write_data);
1526   op->buffer = buffer;
1527   op->count_requested = count;
1528   op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
1529   op->io_priority = io_priority;
1530   op->need_idle = TRUE;
1531   
1532   if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1533       g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1534     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
1535   else
1536     g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
1537   g_object_unref (res);
1538 }
1539
1540 static gssize
1541 g_output_stream_real_write_finish (GOutputStream  *stream,
1542                                    GAsyncResult   *result,
1543                                    GError        **error)
1544 {
1545   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1546   WriteData *op;
1547
1548   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async);
1549   op = g_simple_async_result_get_op_res_gpointer (simple);
1550   return op->count_written;
1551 }
1552
1553 typedef struct {
1554   GInputStream *source;
1555   GOutputStreamSpliceFlags flags;
1556   gssize bytes_copied;
1557 } SpliceData;
1558
1559 static void
1560 splice_async_thread (GSimpleAsyncResult *result,
1561                      GObject            *object,
1562                      GCancellable       *cancellable)
1563 {
1564   SpliceData *op;
1565   GOutputStreamClass *class;
1566   GError *error = NULL;
1567   GOutputStream *stream;
1568
1569   stream = G_OUTPUT_STREAM (object);
1570   class = G_OUTPUT_STREAM_GET_CLASS (object);
1571   op = g_simple_async_result_get_op_res_gpointer (result);
1572   
1573   op->bytes_copied = class->splice (stream,
1574                                     op->source,
1575                                     op->flags,
1576                                     cancellable,
1577                                     &error);
1578   if (op->bytes_copied == -1)
1579     g_simple_async_result_take_error (result, error);
1580 }
1581
1582 static void
1583 g_output_stream_real_splice_async (GOutputStream             *stream,
1584                                    GInputStream              *source,
1585                                    GOutputStreamSpliceFlags   flags,
1586                                    int                        io_priority,
1587                                    GCancellable              *cancellable,
1588                                    GAsyncReadyCallback        callback,
1589                                    gpointer                   user_data)
1590 {
1591   GSimpleAsyncResult *res;
1592   SpliceData *op;
1593
1594   op = g_new0 (SpliceData, 1);
1595   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async);
1596   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1597   op->flags = flags;
1598   op->source = source;
1599
1600   /* TODO: In the case where both source and destintion have
1601      non-threadbased async calls we can use a true async copy here */
1602   
1603   g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable);
1604   g_object_unref (res);
1605 }
1606
1607 static gssize
1608 g_output_stream_real_splice_finish (GOutputStream  *stream,
1609                                     GAsyncResult   *result,
1610                                     GError        **error)
1611 {
1612   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1613   SpliceData *op;
1614
1615   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async);
1616   op = g_simple_async_result_get_op_res_gpointer (simple);
1617   return op->bytes_copied;
1618 }
1619
1620
1621 static void
1622 flush_async_thread (GSimpleAsyncResult *res,
1623                     GObject            *object,
1624                     GCancellable       *cancellable)
1625 {
1626   GOutputStreamClass *class;
1627   gboolean result;
1628   GError *error = NULL;
1629
1630   class = G_OUTPUT_STREAM_GET_CLASS (object);
1631   result = TRUE;
1632   if (class->flush)
1633     result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1634
1635   if (!result)
1636     g_simple_async_result_take_error (res, error);
1637 }
1638
1639 static void
1640 g_output_stream_real_flush_async (GOutputStream       *stream,
1641                                   int                  io_priority,
1642                                   GCancellable        *cancellable,
1643                                   GAsyncReadyCallback  callback,
1644                                   gpointer             user_data)
1645 {
1646   GSimpleAsyncResult *res;
1647
1648   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1649   
1650   g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable);
1651   g_object_unref (res);
1652 }
1653
1654 static gboolean
1655 g_output_stream_real_flush_finish (GOutputStream  *stream,
1656                                    GAsyncResult   *result,
1657                                    GError        **error)
1658 {
1659   return TRUE;
1660 }
1661
1662 static void
1663 close_async_thread (GSimpleAsyncResult *res,
1664                     GObject            *object,
1665                     GCancellable       *cancellable)
1666 {
1667   GOutputStreamClass *class;
1668   GError *error = NULL;
1669   gboolean result = TRUE;
1670
1671   class = G_OUTPUT_STREAM_GET_CLASS (object);
1672
1673   /* Do a flush here if there is a flush function, and we did not have to do
1674      an async flush before (see g_output_stream_close_async) */
1675   if (class->flush != NULL &&
1676       (class->flush_async == NULL ||
1677        class->flush_async == g_output_stream_real_flush_async))
1678     {
1679       result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1680     }
1681
1682   /* Auto handling of cancelation disabled, and ignore
1683      cancellation, since we want to close things anyway, although
1684      possibly in a quick-n-dirty way. At least we never want to leak
1685      open handles */
1686   
1687   if (class->close_fn)
1688     {
1689       /* Make sure to close, even if the flush failed (see sync close) */
1690       if (!result)
1691         class->close_fn (G_OUTPUT_STREAM (object), cancellable, NULL);
1692       else
1693         result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error);
1694
1695       if (!result)
1696         g_simple_async_result_take_error (res, error);
1697     }
1698 }
1699
1700 static void
1701 g_output_stream_real_close_async (GOutputStream       *stream,
1702                                   int                  io_priority,
1703                                   GCancellable        *cancellable,
1704                                   GAsyncReadyCallback  callback,
1705                                   gpointer             user_data)
1706 {
1707   GSimpleAsyncResult *res;
1708   
1709   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async);
1710
1711   g_simple_async_result_set_handle_cancellation (res, FALSE);
1712   
1713   g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable);
1714   g_object_unref (res);
1715 }
1716
1717 static gboolean
1718 g_output_stream_real_close_finish (GOutputStream  *stream,
1719                                    GAsyncResult   *result,
1720                                    GError        **error)
1721 {
1722   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1723   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async);
1724   return TRUE;
1725 }