gio: add g_async_result_is_tagged()
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gsimpleasyncresult.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
49
50 struct _GOutputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   guint closing : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101
102 static void
103 g_output_stream_finalize (GObject *object)
104 {
105   G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object);
106 }
107
108 static void
109 g_output_stream_dispose (GObject *object)
110 {
111   GOutputStream *stream;
112
113   stream = G_OUTPUT_STREAM (object);
114   
115   if (!stream->priv->closed)
116     g_output_stream_close (stream, NULL, NULL);
117
118   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
119 }
120
121 static void
122 g_output_stream_class_init (GOutputStreamClass *klass)
123 {
124   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
125   
126   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
127   
128   gobject_class->finalize = g_output_stream_finalize;
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
147                                               G_TYPE_OUTPUT_STREAM,
148                                               GOutputStreamPrivate);
149 }
150
151 /**
152  * g_output_stream_write:
153  * @stream: a #GOutputStream.
154  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
155  * @count: the number of bytes to write
156  * @cancellable: (allow-none): optional cancellable object
157  * @error: location to store the error occurring, or %NULL to ignore
158  *
159  * Tries to write @count bytes from @buffer into the stream. Will block
160  * during the operation.
161  * 
162  * If count is 0, returns 0 and does nothing. A value of @count
163  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
164  *
165  * On success, the number of bytes written to the stream is returned.
166  * It is not an error if this is not the same as the requested size, as it
167  * can happen e.g. on a partial I/O error, or if there is not enough
168  * storage in the stream. All writes block until at least one byte
169  * is written or an error occurs; 0 is never returned (unless
170  * @count is 0).
171  * 
172  * If @cancellable is not %NULL, then the operation can be cancelled by
173  * triggering the cancellable object from another thread. If the operation
174  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
175  * operation was partially finished when the operation was cancelled the
176  * partial result will be returned, without an error.
177  *
178  * On error -1 is returned and @error is set accordingly.
179  * 
180  * Virtual: write_fn
181  *
182  * Return value: Number of bytes written, or -1 on error
183  **/
184 gssize
185 g_output_stream_write (GOutputStream  *stream,
186                        const void     *buffer,
187                        gsize           count,
188                        GCancellable   *cancellable,
189                        GError        **error)
190 {
191   GOutputStreamClass *class;
192   gssize res;
193
194   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
195   g_return_val_if_fail (buffer != NULL, 0);
196
197   if (count == 0)
198     return 0;
199   
200   if (((gssize) count) < 0)
201     {
202       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
203                    _("Too large count value passed to %s"), G_STRFUNC);
204       return -1;
205     }
206
207   class = G_OUTPUT_STREAM_GET_CLASS (stream);
208
209   if (class->write_fn == NULL) 
210     {
211       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
212                            _("Output stream doesn't implement write"));
213       return -1;
214     }
215   
216   if (!g_output_stream_set_pending (stream, error))
217     return -1;
218   
219   if (cancellable)
220     g_cancellable_push_current (cancellable);
221   
222   res = class->write_fn (stream, buffer, count, cancellable, error);
223   
224   if (cancellable)
225     g_cancellable_pop_current (cancellable);
226   
227   g_output_stream_clear_pending (stream);
228
229   return res; 
230 }
231
232 /**
233  * g_output_stream_write_all:
234  * @stream: a #GOutputStream.
235  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
236  * @count: the number of bytes to write
237  * @bytes_written: (out): location to store the number of bytes that was 
238  *     written to the stream
239  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
240  * @error: location to store the error occurring, or %NULL to ignore
241  *
242  * Tries to write @count bytes from @buffer into the stream. Will block
243  * during the operation.
244  * 
245  * This function is similar to g_output_stream_write(), except it tries to
246  * write as many bytes as requested, only stopping on an error.
247  *
248  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
249  * is set to @count.
250  * 
251  * If there is an error during the operation %FALSE is returned and @error
252  * is set to indicate the error status, @bytes_written is updated to contain
253  * the number of bytes written into the stream before the error occurred.
254  *
255  * Return value: %TRUE on success, %FALSE if there was an error
256  **/
257 gboolean
258 g_output_stream_write_all (GOutputStream  *stream,
259                            const void     *buffer,
260                            gsize           count,
261                            gsize          *bytes_written,
262                            GCancellable   *cancellable,
263                            GError        **error)
264 {
265   gsize _bytes_written;
266   gssize res;
267
268   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
269   g_return_val_if_fail (buffer != NULL, FALSE);
270
271   _bytes_written = 0;
272   while (_bytes_written < count)
273     {
274       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
275                                    cancellable, error);
276       if (res == -1)
277         {
278           if (bytes_written)
279             *bytes_written = _bytes_written;
280           return FALSE;
281         }
282       
283       if (res == 0)
284         g_warning ("Write returned zero without error");
285
286       _bytes_written += res;
287     }
288   
289   if (bytes_written)
290     *bytes_written = _bytes_written;
291
292   return TRUE;
293 }
294
295 /**
296  * g_output_stream_write_bytes:
297  * @stream: a #GOutputStream.
298  * @bytes: the #GBytes to write
299  * @cancellable: (allow-none): optional cancellable object
300  * @error: location to store the error occurring, or %NULL to ignore
301  *
302  * Tries to write the data from @bytes into the stream. Will block
303  * during the operation.
304  *
305  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
306  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
307  *
308  * On success, the number of bytes written to the stream is returned.
309  * It is not an error if this is not the same as the requested size, as it
310  * can happen e.g. on a partial I/O error, or if there is not enough
311  * storage in the stream. All writes block until at least one byte
312  * is written or an error occurs; 0 is never returned (unless
313  * the size of @bytes is 0).
314  *
315  * If @cancellable is not %NULL, then the operation can be cancelled by
316  * triggering the cancellable object from another thread. If the operation
317  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
318  * operation was partially finished when the operation was cancelled the
319  * partial result will be returned, without an error.
320  *
321  * On error -1 is returned and @error is set accordingly.
322  *
323  * Return value: Number of bytes written, or -1 on error
324  **/
325 gssize
326 g_output_stream_write_bytes (GOutputStream  *stream,
327                              GBytes         *bytes,
328                              GCancellable   *cancellable,
329                              GError        **error)
330 {
331   gsize size;
332   gconstpointer data;
333
334   data = g_bytes_get_data (bytes, &size);
335
336   return g_output_stream_write (stream,
337                                 data, size,
338                                 cancellable,
339                                 error);
340 }
341
342 /**
343  * g_output_stream_flush:
344  * @stream: a #GOutputStream.
345  * @cancellable: (allow-none): optional cancellable object
346  * @error: location to store the error occurring, or %NULL to ignore
347  *
348  * Forces a write of all user-space buffered data for the given
349  * @stream. Will block during the operation. Closing the stream will
350  * implicitly cause a flush.
351  *
352  * This function is optional for inherited classes.
353  * 
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
357  *
358  * Return value: %TRUE on success, %FALSE on error
359  **/
360 gboolean
361 g_output_stream_flush (GOutputStream  *stream,
362                        GCancellable   *cancellable,
363                        GError        **error)
364 {
365   GOutputStreamClass *class;
366   gboolean res;
367
368   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
369
370   if (!g_output_stream_set_pending (stream, error))
371     return FALSE;
372   
373   class = G_OUTPUT_STREAM_GET_CLASS (stream);
374
375   res = TRUE;
376   if (class->flush)
377     {
378       if (cancellable)
379         g_cancellable_push_current (cancellable);
380       
381       res = class->flush (stream, cancellable, error);
382       
383       if (cancellable)
384         g_cancellable_pop_current (cancellable);
385     }
386   
387   g_output_stream_clear_pending (stream);
388
389   return res;
390 }
391
392 /**
393  * g_output_stream_splice:
394  * @stream: a #GOutputStream.
395  * @source: a #GInputStream.
396  * @flags: a set of #GOutputStreamSpliceFlags.
397  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
398  * @error: a #GError location to store the error occurring, or %NULL to
399  * ignore.
400  *
401  * Splices an input stream into an output stream.
402  *
403  * Returns: a #gssize containing the size of the data spliced, or
404  *     -1 if an error occurred. Note that if the number of bytes
405  *     spliced is greater than %G_MAXSSIZE, then that will be
406  *     returned, and there is no way to determine the actual number
407  *     of bytes spliced.
408  **/
409 gssize
410 g_output_stream_splice (GOutputStream             *stream,
411                         GInputStream              *source,
412                         GOutputStreamSpliceFlags   flags,
413                         GCancellable              *cancellable,
414                         GError                   **error)
415 {
416   GOutputStreamClass *class;
417   gssize bytes_copied;
418
419   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
420   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
421
422   if (g_input_stream_is_closed (source))
423     {
424       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
425                            _("Source stream is already closed"));
426       return -1;
427     }
428
429   if (!g_output_stream_set_pending (stream, error))
430     return -1;
431
432   class = G_OUTPUT_STREAM_GET_CLASS (stream);
433
434   if (cancellable)
435     g_cancellable_push_current (cancellable);
436
437   bytes_copied = class->splice (stream, source, flags, cancellable, error);
438
439   if (cancellable)
440     g_cancellable_pop_current (cancellable);
441
442   g_output_stream_clear_pending (stream);
443
444   return bytes_copied;
445 }
446
447 static gssize
448 g_output_stream_real_splice (GOutputStream             *stream,
449                              GInputStream              *source,
450                              GOutputStreamSpliceFlags   flags,
451                              GCancellable              *cancellable,
452                              GError                   **error)
453 {
454   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
455   gssize n_read, n_written;
456   gsize bytes_copied;
457   char buffer[8192], *p;
458   gboolean res;
459
460   bytes_copied = 0;
461   if (class->write_fn == NULL)
462     {
463       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
464                            _("Output stream doesn't implement write"));
465       res = FALSE;
466       goto notsupported;
467     }
468
469   res = TRUE;
470   do
471     {
472       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
473       if (n_read == -1)
474         {
475           res = FALSE;
476           break;
477         }
478
479       if (n_read == 0)
480         break;
481
482       p = buffer;
483       while (n_read > 0)
484         {
485           n_written = class->write_fn (stream, p, n_read, cancellable, error);
486           if (n_written == -1)
487             {
488               res = FALSE;
489               break;
490             }
491
492           p += n_written;
493           n_read -= n_written;
494           bytes_copied += n_written;
495         }
496
497       if (bytes_copied > G_MAXSSIZE)
498         bytes_copied = G_MAXSSIZE;
499     }
500   while (res);
501
502  notsupported:
503   if (!res)
504     error = NULL; /* Ignore further errors */
505
506   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
507     {
508       /* Don't care about errors in source here */
509       g_input_stream_close (source, cancellable, NULL);
510     }
511
512   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
513     {
514       /* But write errors on close are bad! */
515       res = _g_output_stream_close_internal (stream, cancellable, error);
516     }
517
518   if (res)
519     return bytes_copied;
520
521   return -1;
522 }
523
524 /* Must always be called inside
525  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
526 static gboolean
527 _g_output_stream_close_internal (GOutputStream  *stream,
528                                  GCancellable   *cancellable,
529                                  GError        **error)
530 {
531   GOutputStreamClass *class;
532   gboolean res;
533
534   if (stream->priv->closed)
535     return TRUE;
536
537   class = G_OUTPUT_STREAM_GET_CLASS (stream);
538
539   stream->priv->closing = TRUE;
540
541   if (cancellable)
542     g_cancellable_push_current (cancellable);
543
544   if (class->flush)
545     res = class->flush (stream, cancellable, error);
546   else
547     res = TRUE;
548
549   if (!res)
550     {
551       /* flushing caused the error that we want to return,
552        * but we still want to close the underlying stream if possible
553        */
554       if (class->close_fn)
555         class->close_fn (stream, cancellable, NULL);
556     }
557   else
558     {
559       res = TRUE;
560       if (class->close_fn)
561         res = class->close_fn (stream, cancellable, error);
562     }
563
564   if (cancellable)
565     g_cancellable_pop_current (cancellable);
566
567   stream->priv->closing = FALSE;
568   stream->priv->closed = TRUE;
569
570   return res;
571 }
572
573 /**
574  * g_output_stream_close:
575  * @stream: A #GOutputStream.
576  * @cancellable: (allow-none): optional cancellable object
577  * @error: location to store the error occurring, or %NULL to ignore
578  *
579  * Closes the stream, releasing resources related to it.
580  *
581  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
582  * Closing a stream multiple times will not return an error.
583  *
584  * Closing a stream will automatically flush any outstanding buffers in the
585  * stream.
586  *
587  * Streams will be automatically closed when the last reference
588  * is dropped, but you might want to call this function to make sure 
589  * resources are released as early as possible.
590  *
591  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
592  * open after the stream is closed. See the documentation for the individual
593  * stream for details.
594  *
595  * On failure the first error that happened will be reported, but the close
596  * operation will finish as much as possible. A stream that failed to
597  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
598  * is important to check and report the error to the user, otherwise
599  * there might be a loss of data as all data might not be written.
600  * 
601  * If @cancellable is not %NULL, then the operation can be cancelled by
602  * triggering the cancellable object from another thread. If the operation
603  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
604  * Cancelling a close will still leave the stream closed, but there some streams
605  * can use a faster close that doesn't block to e.g. check errors. On
606  * cancellation (as with any error) there is no guarantee that all written
607  * data will reach the target. 
608  *
609  * Return value: %TRUE on success, %FALSE on failure
610  **/
611 gboolean
612 g_output_stream_close (GOutputStream  *stream,
613                        GCancellable   *cancellable,
614                        GError        **error)
615 {
616   gboolean res;
617
618   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
619
620   if (stream->priv->closed)
621     return TRUE;
622
623   if (!g_output_stream_set_pending (stream, error))
624     return FALSE;
625
626   res = _g_output_stream_close_internal (stream, cancellable, error);
627
628   g_output_stream_clear_pending (stream);
629   
630   return res;
631 }
632
633 static void
634 async_ready_callback_wrapper (GObject      *source_object,
635                               GAsyncResult *res,
636                               gpointer      user_data)
637 {
638   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
639
640   g_output_stream_clear_pending (stream);
641   if (stream->priv->outstanding_callback)
642     (*stream->priv->outstanding_callback) (source_object, res, user_data);
643   g_object_unref (stream);
644 }
645
646 typedef struct {
647   gint io_priority;
648   GCancellable *cancellable;
649   GError *flush_error;
650   gpointer user_data;
651 } CloseUserData;
652
653 static void
654 async_ready_close_callback_wrapper (GObject      *source_object,
655                                     GAsyncResult *res,
656                                     gpointer      user_data)
657 {
658   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
659   CloseUserData *data = user_data;
660
661   stream->priv->closing = FALSE;
662   stream->priv->closed = TRUE;
663
664   g_output_stream_clear_pending (stream);
665
666   if (stream->priv->outstanding_callback)
667     {
668       if (data->flush_error != NULL)
669         {
670           GSimpleAsyncResult *err;
671
672           err = g_simple_async_result_new_take_error (source_object,
673                                                       stream->priv->outstanding_callback,
674                                                       data->user_data,
675                                                       data->flush_error);
676           data->flush_error = NULL;
677
678           (*stream->priv->outstanding_callback) (source_object,
679                                                  G_ASYNC_RESULT (err),
680                                                  data->user_data);
681           g_object_unref (err);
682         }
683       else
684         {
685           (*stream->priv->outstanding_callback) (source_object,
686                                                  res,
687                                                  data->user_data);
688         }
689     }
690
691   g_object_unref (stream);
692
693   if (data->cancellable)
694     g_object_unref (data->cancellable);
695
696   if (data->flush_error)
697     g_error_free (data->flush_error);
698
699   g_slice_free (CloseUserData, data);
700 }
701
702 static void
703 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
704                                             GAsyncResult *res,
705                                             gpointer      user_data)
706 {
707   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
708   GOutputStreamClass *class;
709   CloseUserData *data = user_data;
710   GSimpleAsyncResult *simple;
711
712   /* propagate the possible error */
713   if (G_IS_SIMPLE_ASYNC_RESULT (res))
714     {
715       simple = G_SIMPLE_ASYNC_RESULT (res);
716       g_simple_async_result_propagate_error (simple, &data->flush_error);
717     }
718
719   class = G_OUTPUT_STREAM_GET_CLASS (stream);
720
721   /* we still close, even if there was a flush error */
722   class->close_async (stream, data->io_priority, data->cancellable,
723                       async_ready_close_callback_wrapper, user_data);
724 }
725
726 /**
727  * g_output_stream_write_async:
728  * @stream: A #GOutputStream.
729  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
730  * @count: the number of bytes to write
731  * @io_priority: the io priority of the request.
732  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
733  * @callback: (scope async): callback to call when the request is satisfied
734  * @user_data: (closure): the data to pass to callback function
735  *
736  * Request an asynchronous write of @count bytes from @buffer into 
737  * the stream. When the operation is finished @callback will be called.
738  * You can then call g_output_stream_write_finish() to get the result of the 
739  * operation.
740  *
741  * During an async request no other sync and async calls are allowed, 
742  * and will result in %G_IO_ERROR_PENDING errors. 
743  *
744  * A value of @count larger than %G_MAXSSIZE will cause a 
745  * %G_IO_ERROR_INVALID_ARGUMENT error.
746  *
747  * On success, the number of bytes written will be passed to the
748  * @callback. It is not an error if this is not the same as the 
749  * requested size, as it can happen e.g. on a partial I/O error, 
750  * but generally we try to write as many bytes as requested. 
751  *
752  * You are guaranteed that this method will never fail with
753  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
754  * method will just wait until this changes.
755  *
756  * Any outstanding I/O request with higher priority (lower numerical 
757  * value) will be executed before an outstanding request with lower 
758  * priority. Default priority is %G_PRIORITY_DEFAULT.
759  *
760  * The asyncronous methods have a default fallback that uses threads 
761  * to implement asynchronicity, so they are optional for inheriting 
762  * classes. However, if you override one you must override all.
763  *
764  * For the synchronous, blocking version of this function, see 
765  * g_output_stream_write().
766  **/
767 void
768 g_output_stream_write_async (GOutputStream       *stream,
769                              const void          *buffer,
770                              gsize                count,
771                              int                  io_priority,
772                              GCancellable        *cancellable,
773                              GAsyncReadyCallback  callback,
774                              gpointer             user_data)
775 {
776   GOutputStreamClass *class;
777   GSimpleAsyncResult *simple;
778   GError *error = NULL;
779
780   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
781   g_return_if_fail (buffer != NULL);
782
783   if (count == 0)
784     {
785       simple = g_simple_async_result_new (G_OBJECT (stream),
786                                           callback,
787                                           user_data,
788                                           g_output_stream_write_async);
789       g_simple_async_result_complete_in_idle (simple);
790       g_object_unref (simple);
791       return;
792     }
793
794   if (((gssize) count) < 0)
795     {
796       g_simple_async_report_error_in_idle (G_OBJECT (stream),
797                                            callback,
798                                            user_data,
799                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
800                                            _("Too large count value passed to %s"),
801                                            G_STRFUNC);
802       return;
803     }
804
805   if (!g_output_stream_set_pending (stream, &error))
806     {
807       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
808                                             callback,
809                                             user_data,
810                                             error);
811       return;
812     }
813   
814   class = G_OUTPUT_STREAM_GET_CLASS (stream);
815
816   stream->priv->outstanding_callback = callback;
817   g_object_ref (stream);
818   class->write_async (stream, buffer, count, io_priority, cancellable,
819                       async_ready_callback_wrapper, user_data);
820 }
821
822 /**
823  * g_output_stream_write_finish:
824  * @stream: a #GOutputStream.
825  * @result: a #GAsyncResult.
826  * @error: a #GError location to store the error occurring, or %NULL to 
827  * ignore.
828  * 
829  * Finishes a stream write operation.
830  * 
831  * Returns: a #gssize containing the number of bytes written to the stream.
832  **/
833 gssize
834 g_output_stream_write_finish (GOutputStream  *stream,
835                               GAsyncResult   *result,
836                               GError        **error)
837 {
838   GOutputStreamClass *class;
839
840   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
841   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
842
843   if (g_async_result_legacy_propagate_error (result, error))
844     return -1;
845   else if (g_async_result_is_tagged (result, g_output_stream_write_async))
846     {
847       /* Special case writes of 0 bytes */
848       return 0;
849     }
850   
851   class = G_OUTPUT_STREAM_GET_CLASS (stream);
852   return class->write_finish (stream, result, error);
853 }
854
855 static void
856 write_bytes_callback (GObject      *stream,
857                       GAsyncResult *result,
858                       gpointer      user_data)
859 {
860   GSimpleAsyncResult *simple = user_data;
861   GError *error = NULL;
862   gssize nwrote;
863
864   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
865                                          result, &error);
866   if (nwrote == -1)
867     g_simple_async_result_take_error (simple, error);
868   else
869     g_simple_async_result_set_op_res_gssize (simple, nwrote);
870   g_simple_async_result_complete (simple);
871   g_object_unref (simple);
872 }
873
874 /**
875  * g_output_stream_write_bytes_async:
876  * @stream: A #GOutputStream.
877  * @bytes: The bytes to write
878  * @io_priority: the io priority of the request.
879  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
880  * @callback: (scope async): callback to call when the request is satisfied
881  * @user_data: (closure): the data to pass to callback function
882  *
883  * Request an asynchronous write of the data in @bytes to the stream.
884  * When the operation is finished @callback will be called. You can
885  * then call g_output_stream_write_bytes_finish() to get the result of
886  * the operation.
887  *
888  * During an async request no other sync and async calls are allowed,
889  * and will result in %G_IO_ERROR_PENDING errors.
890  *
891  * A #GBytes larger than %G_MAXSSIZE will cause a
892  * %G_IO_ERROR_INVALID_ARGUMENT error.
893  *
894  * On success, the number of bytes written will be passed to the
895  * @callback. It is not an error if this is not the same as the
896  * requested size, as it can happen e.g. on a partial I/O error,
897  * but generally we try to write as many bytes as requested.
898  *
899  * You are guaranteed that this method will never fail with
900  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
901  * method will just wait until this changes.
902  *
903  * Any outstanding I/O request with higher priority (lower numerical
904  * value) will be executed before an outstanding request with lower
905  * priority. Default priority is %G_PRIORITY_DEFAULT.
906  *
907  * For the synchronous, blocking version of this function, see
908  * g_output_stream_write_bytes().
909  **/
910 void
911 g_output_stream_write_bytes_async (GOutputStream       *stream,
912                                    GBytes              *bytes,
913                                    int                  io_priority,
914                                    GCancellable        *cancellable,
915                                    GAsyncReadyCallback  callback,
916                                    gpointer             user_data)
917 {
918   GSimpleAsyncResult *simple;
919   gsize size;
920   gconstpointer data;
921
922   data = g_bytes_get_data (bytes, &size);
923
924   simple = g_simple_async_result_new (G_OBJECT (stream),
925                                       callback, user_data,
926                                       g_output_stream_write_bytes_async);
927   g_simple_async_result_set_op_res_gpointer (simple, g_bytes_ref (bytes),
928                                              (GDestroyNotify) g_bytes_unref);
929
930   g_output_stream_write_async (stream,
931                                data, size,
932                                io_priority,
933                                cancellable,
934                                write_bytes_callback,
935                                simple);
936 }
937
938 /**
939  * g_output_stream_write_bytes_finish:
940  * @stream: a #GOutputStream.
941  * @result: a #GAsyncResult.
942  * @error: a #GError location to store the error occurring, or %NULL to
943  * ignore.
944  *
945  * Finishes a stream write-from-#GBytes operation.
946  *
947  * Returns: a #gssize containing the number of bytes written to the stream.
948  **/
949 gssize
950 g_output_stream_write_bytes_finish (GOutputStream  *stream,
951                                     GAsyncResult   *result,
952                                     GError        **error)
953 {
954   GSimpleAsyncResult *simple;
955
956   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
957   g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (stream), g_output_stream_write_bytes_async), -1);
958
959   simple = G_SIMPLE_ASYNC_RESULT (result);
960   if (g_simple_async_result_propagate_error (simple, error))
961     return -1;
962   return g_simple_async_result_get_op_res_gssize (simple);
963 }
964
965 typedef struct {
966   GInputStream *source;
967   gpointer user_data;
968   GAsyncReadyCallback callback;
969 } SpliceUserData;
970
971 static void
972 async_ready_splice_callback_wrapper (GObject      *source_object,
973                                      GAsyncResult *res,
974                                      gpointer     _data)
975 {
976   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
977   SpliceUserData *data = _data;
978   
979   g_output_stream_clear_pending (stream);
980   
981   if (data->callback)
982     (*data->callback) (source_object, res, data->user_data);
983   
984   g_object_unref (stream);
985   g_object_unref (data->source);
986   g_free (data);
987 }
988
989 /**
990  * g_output_stream_splice_async:
991  * @stream: a #GOutputStream.
992  * @source: a #GInputStream. 
993  * @flags: a set of #GOutputStreamSpliceFlags.
994  * @io_priority: the io priority of the request.
995  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
996  * @callback: (scope async): a #GAsyncReadyCallback. 
997  * @user_data: (closure): user data passed to @callback.
998  * 
999  * Splices a stream asynchronously.
1000  * When the operation is finished @callback will be called.
1001  * You can then call g_output_stream_splice_finish() to get the 
1002  * result of the operation.
1003  *
1004  * For the synchronous, blocking version of this function, see 
1005  * g_output_stream_splice().
1006  **/
1007 void
1008 g_output_stream_splice_async (GOutputStream            *stream,
1009                               GInputStream             *source,
1010                               GOutputStreamSpliceFlags  flags,
1011                               int                       io_priority,
1012                               GCancellable             *cancellable,
1013                               GAsyncReadyCallback       callback,
1014                               gpointer                  user_data)
1015 {
1016   GOutputStreamClass *class;
1017   SpliceUserData *data;
1018   GError *error = NULL;
1019
1020   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1021   g_return_if_fail (G_IS_INPUT_STREAM (source));
1022
1023   if (g_input_stream_is_closed (source))
1024     {
1025       g_simple_async_report_error_in_idle (G_OBJECT (stream),
1026                                            callback,
1027                                            user_data,
1028                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
1029                                            _("Source stream is already closed"));
1030       return;
1031     }
1032   
1033   if (!g_output_stream_set_pending (stream, &error))
1034     {
1035       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1036                                             callback,
1037                                             user_data,
1038                                             error);
1039       return;
1040     }
1041
1042   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1043
1044   data = g_new0 (SpliceUserData, 1);
1045   data->callback = callback;
1046   data->user_data = user_data;
1047   data->source = g_object_ref (source);
1048   
1049   g_object_ref (stream);
1050   class->splice_async (stream, source, flags, io_priority, cancellable,
1051                       async_ready_splice_callback_wrapper, data);
1052 }
1053
1054 /**
1055  * g_output_stream_splice_finish:
1056  * @stream: a #GOutputStream.
1057  * @result: a #GAsyncResult.
1058  * @error: a #GError location to store the error occurring, or %NULL to 
1059  * ignore.
1060  *
1061  * Finishes an asynchronous stream splice operation.
1062  * 
1063  * Returns: a #gssize of the number of bytes spliced. Note that if the
1064  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1065  *     will be returned, and there is no way to determine the actual
1066  *     number of bytes spliced.
1067  **/
1068 gssize
1069 g_output_stream_splice_finish (GOutputStream  *stream,
1070                                GAsyncResult   *result,
1071                                GError        **error)
1072 {
1073   GOutputStreamClass *class;
1074
1075   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
1076   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1077
1078   if (g_async_result_legacy_propagate_error (result, error))
1079     return -1;
1080   
1081   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1082   return class->splice_finish (stream, result, error);
1083 }
1084
1085 /**
1086  * g_output_stream_flush_async:
1087  * @stream: a #GOutputStream.
1088  * @io_priority: the io priority of the request.
1089  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1090  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1091  * @user_data: (closure): the data to pass to callback function
1092  * 
1093  * Forces an asynchronous write of all user-space buffered data for
1094  * the given @stream.
1095  * For behaviour details see g_output_stream_flush().
1096  *
1097  * When the operation is finished @callback will be 
1098  * called. You can then call g_output_stream_flush_finish() to get the 
1099  * result of the operation.
1100  **/
1101 void
1102 g_output_stream_flush_async (GOutputStream       *stream,
1103                              int                  io_priority,
1104                              GCancellable        *cancellable,
1105                              GAsyncReadyCallback  callback,
1106                              gpointer             user_data)
1107 {
1108   GOutputStreamClass *class;
1109   GSimpleAsyncResult *simple;
1110   GError *error = NULL;
1111
1112   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1113
1114   if (!g_output_stream_set_pending (stream, &error))
1115     {
1116       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1117                                             callback,
1118                                             user_data,
1119                                             error);
1120       return;
1121     }
1122
1123   stream->priv->outstanding_callback = callback;
1124   g_object_ref (stream);
1125
1126   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1127   
1128   if (class->flush_async == NULL)
1129     {
1130       simple = g_simple_async_result_new (G_OBJECT (stream),
1131                                           async_ready_callback_wrapper,
1132                                           user_data,
1133                                           g_output_stream_flush_async);
1134       g_simple_async_result_complete_in_idle (simple);
1135       g_object_unref (simple);
1136       return;
1137     }
1138       
1139   class->flush_async (stream, io_priority, cancellable,
1140                       async_ready_callback_wrapper, user_data);
1141 }
1142
1143 /**
1144  * g_output_stream_flush_finish:
1145  * @stream: a #GOutputStream.
1146  * @result: a GAsyncResult.
1147  * @error: a #GError location to store the error occurring, or %NULL to 
1148  * ignore.
1149  * 
1150  * Finishes flushing an output stream.
1151  * 
1152  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1153  **/
1154 gboolean
1155 g_output_stream_flush_finish (GOutputStream  *stream,
1156                               GAsyncResult   *result,
1157                               GError        **error)
1158 {
1159   GOutputStreamClass *klass;
1160
1161   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1162   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1163
1164   if (g_async_result_legacy_propagate_error (result, error))
1165     return FALSE;
1166   else if (g_async_result_is_tagged (result, g_output_stream_flush_async))
1167     {
1168       /* Special case default implementation */
1169       return 0;
1170     }
1171
1172   klass = G_OUTPUT_STREAM_GET_CLASS (stream);
1173   return klass->flush_finish (stream, result, error);
1174 }
1175
1176
1177 /**
1178  * g_output_stream_close_async:
1179  * @stream: A #GOutputStream.
1180  * @io_priority: the io priority of the request.
1181  * @cancellable: (allow-none): optional cancellable object
1182  * @callback: (scope async): callback to call when the request is satisfied
1183  * @user_data: (closure): the data to pass to callback function
1184  *
1185  * Requests an asynchronous close of the stream, releasing resources 
1186  * related to it. When the operation is finished @callback will be 
1187  * called. You can then call g_output_stream_close_finish() to get 
1188  * the result of the operation.
1189  *
1190  * For behaviour details see g_output_stream_close().
1191  *
1192  * The asyncronous methods have a default fallback that uses threads 
1193  * to implement asynchronicity, so they are optional for inheriting 
1194  * classes. However, if you override one you must override all.
1195  **/
1196 void
1197 g_output_stream_close_async (GOutputStream       *stream,
1198                              int                  io_priority,
1199                              GCancellable        *cancellable,
1200                              GAsyncReadyCallback  callback,
1201                              gpointer             user_data)
1202 {
1203   GOutputStreamClass *class;
1204   GSimpleAsyncResult *simple;
1205   GError *error = NULL;
1206   CloseUserData *data;
1207
1208   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1209   
1210   if (stream->priv->closed)
1211     {
1212       simple = g_simple_async_result_new (G_OBJECT (stream),
1213                                           callback,
1214                                           user_data,
1215                                           g_output_stream_close_async);
1216       g_simple_async_result_complete_in_idle (simple);
1217       g_object_unref (simple);
1218       return;
1219     }
1220
1221   if (!g_output_stream_set_pending (stream, &error))
1222     {
1223       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1224                                             callback,
1225                                             user_data,
1226                                             error);
1227       return;
1228     }
1229   
1230   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1231   stream->priv->closing = TRUE;
1232   stream->priv->outstanding_callback = callback;
1233   g_object_ref (stream);
1234
1235   data = g_slice_new0 (CloseUserData);
1236
1237   if (cancellable != NULL)
1238     data->cancellable = g_object_ref (cancellable);
1239
1240   data->io_priority = io_priority;
1241   data->user_data = user_data;
1242
1243   /* Call close_async directly if there is no need to flush, or if the flush
1244      can be done sync (in the output stream async close thread) */
1245   if (class->flush_async == NULL ||
1246       (class->flush_async == g_output_stream_real_flush_async &&
1247        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1248     {
1249       class->close_async (stream, io_priority, cancellable,
1250                           async_ready_close_callback_wrapper, data);
1251     }
1252   else
1253     {
1254       /* First do an async flush, then do the async close in the callback
1255          wrapper (see async_ready_close_flushed_callback_wrapper) */
1256       class->flush_async (stream, io_priority, cancellable,
1257                           async_ready_close_flushed_callback_wrapper, data);
1258     }
1259 }
1260
1261 /**
1262  * g_output_stream_close_finish:
1263  * @stream: a #GOutputStream.
1264  * @result: a #GAsyncResult.
1265  * @error: a #GError location to store the error occurring, or %NULL to 
1266  * ignore.
1267  * 
1268  * Closes an output stream.
1269  * 
1270  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1271  **/
1272 gboolean
1273 g_output_stream_close_finish (GOutputStream  *stream,
1274                               GAsyncResult   *result,
1275                               GError        **error)
1276 {
1277   GOutputStreamClass *class;
1278
1279   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1280   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1281
1282   if (g_async_result_legacy_propagate_error (result, error))
1283     return FALSE;
1284   else if (g_async_result_is_tagged (result, g_output_stream_close_async))
1285     {
1286       /* Special case already closed */
1287       return TRUE;
1288     }
1289
1290   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1291   return class->close_finish (stream, result, error);
1292 }
1293
1294 /**
1295  * g_output_stream_is_closed:
1296  * @stream: a #GOutputStream.
1297  * 
1298  * Checks if an output stream has already been closed.
1299  * 
1300  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1301  **/
1302 gboolean
1303 g_output_stream_is_closed (GOutputStream *stream)
1304 {
1305   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1306   
1307   return stream->priv->closed;
1308 }
1309
1310 /**
1311  * g_output_stream_is_closing:
1312  * @stream: a #GOutputStream.
1313  *
1314  * Checks if an output stream is being closed. This can be
1315  * used inside e.g. a flush implementation to see if the
1316  * flush (or other i/o operation) is called from within
1317  * the closing operation.
1318  *
1319  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1320  *
1321  * Since: 2.24
1322  **/
1323 gboolean
1324 g_output_stream_is_closing (GOutputStream *stream)
1325 {
1326   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1327
1328   return stream->priv->closing;
1329 }
1330
1331 /**
1332  * g_output_stream_has_pending:
1333  * @stream: a #GOutputStream.
1334  * 
1335  * Checks if an ouput stream has pending actions.
1336  * 
1337  * Returns: %TRUE if @stream has pending actions. 
1338  **/
1339 gboolean
1340 g_output_stream_has_pending (GOutputStream *stream)
1341 {
1342   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1343   
1344   return stream->priv->pending;
1345 }
1346
1347 /**
1348  * g_output_stream_set_pending:
1349  * @stream: a #GOutputStream.
1350  * @error: a #GError location to store the error occurring, or %NULL to 
1351  * ignore.
1352  * 
1353  * Sets @stream to have actions pending. If the pending flag is
1354  * already set or @stream is closed, it will return %FALSE and set
1355  * @error.
1356  *
1357  * Return value: %TRUE if pending was previously unset and is now set.
1358  **/
1359 gboolean
1360 g_output_stream_set_pending (GOutputStream *stream,
1361                              GError **error)
1362 {
1363   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1364   
1365   if (stream->priv->closed)
1366     {
1367       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1368                            _("Stream is already closed"));
1369       return FALSE;
1370     }
1371   
1372   if (stream->priv->pending)
1373     {
1374       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1375                            /* Translators: This is an error you get if there is
1376                             * already an operation running against this stream when
1377                             * you try to start one */
1378                            _("Stream has outstanding operation"));
1379       return FALSE;
1380     }
1381   
1382   stream->priv->pending = TRUE;
1383   return TRUE;
1384 }
1385
1386 /**
1387  * g_output_stream_clear_pending:
1388  * @stream: output stream
1389  * 
1390  * Clears the pending flag on @stream.
1391  **/
1392 void
1393 g_output_stream_clear_pending (GOutputStream *stream)
1394 {
1395   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1396   
1397   stream->priv->pending = FALSE;
1398 }
1399
1400
1401 /********************************************
1402  *   Default implementation of async ops    *
1403  ********************************************/
1404
1405 typedef struct {
1406   const void         *buffer;
1407   gsize               count_requested;
1408   gssize              count_written;
1409
1410   GCancellable       *cancellable;
1411   gint                io_priority;
1412   gboolean            need_idle;
1413 } WriteData;
1414
1415 static void
1416 free_write_data (WriteData *op)
1417 {
1418   if (op->cancellable)
1419     g_object_unref (op->cancellable);
1420   g_slice_free (WriteData, op);
1421 }
1422
1423 static void
1424 write_async_thread (GSimpleAsyncResult *res,
1425                     GObject            *object,
1426                     GCancellable       *cancellable)
1427 {
1428   WriteData *op;
1429   GOutputStreamClass *class;
1430   GError *error = NULL;
1431
1432   class = G_OUTPUT_STREAM_GET_CLASS (object);
1433   op = g_simple_async_result_get_op_res_gpointer (res);
1434   op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested,
1435                                        cancellable, &error);
1436   if (op->count_written == -1)
1437     g_simple_async_result_take_error (res, error);
1438 }
1439
1440 static void write_async_pollable (GPollableOutputStream *stream,
1441                                   GSimpleAsyncResult    *result);
1442
1443 static gboolean
1444 write_async_pollable_ready (GPollableOutputStream *stream,
1445                             gpointer               user_data)
1446 {
1447   GSimpleAsyncResult *result = user_data;
1448
1449   write_async_pollable (stream, result);
1450   return FALSE;
1451 }
1452
1453 static void
1454 write_async_pollable (GPollableOutputStream *stream,
1455                       GSimpleAsyncResult    *result)
1456 {
1457   GError *error = NULL;
1458   WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
1459
1460   if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
1461     op->count_written = -1;
1462   else
1463     {
1464       op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1465         write_nonblocking (stream, op->buffer, op->count_requested, &error);
1466     }
1467
1468   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1469     {
1470       GSource *source;
1471
1472       g_error_free (error);
1473       op->need_idle = FALSE;
1474
1475       source = g_pollable_output_stream_create_source (stream, op->cancellable);
1476       g_source_set_callback (source,
1477                              (GSourceFunc) write_async_pollable_ready,
1478                              g_object_ref (result), g_object_unref);
1479       g_source_set_priority (source, op->io_priority);
1480       g_source_attach (source, g_main_context_get_thread_default ());
1481       g_source_unref (source);
1482       return;
1483     }
1484
1485   if (op->count_written == -1)
1486     g_simple_async_result_take_error (result, error);
1487
1488   if (op->need_idle)
1489     g_simple_async_result_complete_in_idle (result);
1490   else
1491     g_simple_async_result_complete (result);
1492 }
1493
1494 static void
1495 g_output_stream_real_write_async (GOutputStream       *stream,
1496                                   const void          *buffer,
1497                                   gsize                count,
1498                                   int                  io_priority,
1499                                   GCancellable        *cancellable,
1500                                   GAsyncReadyCallback  callback,
1501                                   gpointer             user_data)
1502 {
1503   GSimpleAsyncResult *res;
1504   WriteData *op;
1505
1506   op = g_slice_new0 (WriteData);
1507   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1508   g_simple_async_result_set_op_res_gpointer (res, op, (GDestroyNotify) free_write_data);
1509   op->buffer = buffer;
1510   op->count_requested = count;
1511   op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
1512   op->io_priority = io_priority;
1513   op->need_idle = TRUE;
1514   
1515   if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1516       g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1517     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
1518   else
1519     g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
1520   g_object_unref (res);
1521 }
1522
1523 static gssize
1524 g_output_stream_real_write_finish (GOutputStream  *stream,
1525                                    GAsyncResult   *result,
1526                                    GError        **error)
1527 {
1528   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1529   WriteData *op;
1530
1531   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async);
1532
1533   if (g_simple_async_result_propagate_error (simple, error))
1534     return -1;
1535
1536   op = g_simple_async_result_get_op_res_gpointer (simple);
1537   return op->count_written;
1538 }
1539
1540 typedef struct {
1541   GInputStream *source;
1542   GOutputStreamSpliceFlags flags;
1543   gssize bytes_copied;
1544 } SpliceData;
1545
1546 static void
1547 splice_async_thread (GSimpleAsyncResult *result,
1548                      GObject            *object,
1549                      GCancellable       *cancellable)
1550 {
1551   SpliceData *op;
1552   GOutputStreamClass *class;
1553   GError *error = NULL;
1554   GOutputStream *stream;
1555
1556   stream = G_OUTPUT_STREAM (object);
1557   class = G_OUTPUT_STREAM_GET_CLASS (object);
1558   op = g_simple_async_result_get_op_res_gpointer (result);
1559   
1560   op->bytes_copied = class->splice (stream,
1561                                     op->source,
1562                                     op->flags,
1563                                     cancellable,
1564                                     &error);
1565   if (op->bytes_copied == -1)
1566     g_simple_async_result_take_error (result, error);
1567 }
1568
1569 static void
1570 g_output_stream_real_splice_async (GOutputStream             *stream,
1571                                    GInputStream              *source,
1572                                    GOutputStreamSpliceFlags   flags,
1573                                    int                        io_priority,
1574                                    GCancellable              *cancellable,
1575                                    GAsyncReadyCallback        callback,
1576                                    gpointer                   user_data)
1577 {
1578   GSimpleAsyncResult *res;
1579   SpliceData *op;
1580
1581   op = g_new0 (SpliceData, 1);
1582   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async);
1583   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1584   op->flags = flags;
1585   op->source = source;
1586
1587   /* TODO: In the case where both source and destintion have
1588      non-threadbased async calls we can use a true async copy here */
1589   
1590   g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable);
1591   g_object_unref (res);
1592 }
1593
1594 static gssize
1595 g_output_stream_real_splice_finish (GOutputStream  *stream,
1596                                     GAsyncResult   *result,
1597                                     GError        **error)
1598 {
1599   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1600   SpliceData *op;
1601
1602   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async);
1603
1604   if (g_simple_async_result_propagate_error (simple, error))
1605     return -1;
1606
1607   op = g_simple_async_result_get_op_res_gpointer (simple);
1608   return op->bytes_copied;
1609 }
1610
1611
1612 static void
1613 flush_async_thread (GSimpleAsyncResult *res,
1614                     GObject            *object,
1615                     GCancellable       *cancellable)
1616 {
1617   GOutputStreamClass *class;
1618   gboolean result;
1619   GError *error = NULL;
1620
1621   class = G_OUTPUT_STREAM_GET_CLASS (object);
1622   result = TRUE;
1623   if (class->flush)
1624     result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1625
1626   if (!result)
1627     g_simple_async_result_take_error (res, error);
1628 }
1629
1630 static void
1631 g_output_stream_real_flush_async (GOutputStream       *stream,
1632                                   int                  io_priority,
1633                                   GCancellable        *cancellable,
1634                                   GAsyncReadyCallback  callback,
1635                                   gpointer             user_data)
1636 {
1637   GSimpleAsyncResult *res;
1638
1639   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1640   
1641   g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable);
1642   g_object_unref (res);
1643 }
1644
1645 static gboolean
1646 g_output_stream_real_flush_finish (GOutputStream  *stream,
1647                                    GAsyncResult   *result,
1648                                    GError        **error)
1649 {
1650   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1651
1652   if (g_simple_async_result_propagate_error (simple, error))
1653     return FALSE;
1654   return TRUE;
1655 }
1656
1657 static void
1658 close_async_thread (GSimpleAsyncResult *res,
1659                     GObject            *object,
1660                     GCancellable       *cancellable)
1661 {
1662   GOutputStreamClass *class;
1663   GError *error = NULL;
1664   gboolean result = TRUE;
1665
1666   class = G_OUTPUT_STREAM_GET_CLASS (object);
1667
1668   /* Do a flush here if there is a flush function, and we did not have to do
1669      an async flush before (see g_output_stream_close_async) */
1670   if (class->flush != NULL &&
1671       (class->flush_async == NULL ||
1672        class->flush_async == g_output_stream_real_flush_async))
1673     {
1674       result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1675     }
1676
1677   /* Auto handling of cancelation disabled, and ignore
1678      cancellation, since we want to close things anyway, although
1679      possibly in a quick-n-dirty way. At least we never want to leak
1680      open handles */
1681   
1682   if (class->close_fn)
1683     {
1684       /* Make sure to close, even if the flush failed (see sync close) */
1685       if (!result)
1686         class->close_fn (G_OUTPUT_STREAM (object), cancellable, NULL);
1687       else
1688         result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error);
1689
1690       if (!result)
1691         g_simple_async_result_take_error (res, error);
1692     }
1693 }
1694
1695 static void
1696 g_output_stream_real_close_async (GOutputStream       *stream,
1697                                   int                  io_priority,
1698                                   GCancellable        *cancellable,
1699                                   GAsyncReadyCallback  callback,
1700                                   gpointer             user_data)
1701 {
1702   GSimpleAsyncResult *res;
1703   
1704   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async);
1705
1706   g_simple_async_result_set_handle_cancellation (res, FALSE);
1707   
1708   g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable);
1709   g_object_unref (res);
1710 }
1711
1712 static gboolean
1713 g_output_stream_real_close_finish (GOutputStream  *stream,
1714                                    GAsyncResult   *result,
1715                                    GError        **error)
1716 {
1717   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1718
1719   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async);
1720
1721   if (g_simple_async_result_propagate_error (simple, error))
1722     return FALSE;
1723   return TRUE;
1724 }