gio: Add g_async_result_legacy_propagate_error()
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gsimpleasyncresult.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
49
50 struct _GOutputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   guint closing : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101
102 static void
103 g_output_stream_finalize (GObject *object)
104 {
105   G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object);
106 }
107
108 static void
109 g_output_stream_dispose (GObject *object)
110 {
111   GOutputStream *stream;
112
113   stream = G_OUTPUT_STREAM (object);
114   
115   if (!stream->priv->closed)
116     g_output_stream_close (stream, NULL, NULL);
117
118   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
119 }
120
121 static void
122 g_output_stream_class_init (GOutputStreamClass *klass)
123 {
124   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
125   
126   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
127   
128   gobject_class->finalize = g_output_stream_finalize;
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
147                                               G_TYPE_OUTPUT_STREAM,
148                                               GOutputStreamPrivate);
149 }
150
151 /**
152  * g_output_stream_write:
153  * @stream: a #GOutputStream.
154  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
155  * @count: the number of bytes to write
156  * @cancellable: (allow-none): optional cancellable object
157  * @error: location to store the error occurring, or %NULL to ignore
158  *
159  * Tries to write @count bytes from @buffer into the stream. Will block
160  * during the operation.
161  * 
162  * If count is 0, returns 0 and does nothing. A value of @count
163  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
164  *
165  * On success, the number of bytes written to the stream is returned.
166  * It is not an error if this is not the same as the requested size, as it
167  * can happen e.g. on a partial I/O error, or if there is not enough
168  * storage in the stream. All writes block until at least one byte
169  * is written or an error occurs; 0 is never returned (unless
170  * @count is 0).
171  * 
172  * If @cancellable is not %NULL, then the operation can be cancelled by
173  * triggering the cancellable object from another thread. If the operation
174  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
175  * operation was partially finished when the operation was cancelled the
176  * partial result will be returned, without an error.
177  *
178  * On error -1 is returned and @error is set accordingly.
179  * 
180  * Virtual: write_fn
181  *
182  * Return value: Number of bytes written, or -1 on error
183  **/
184 gssize
185 g_output_stream_write (GOutputStream  *stream,
186                        const void     *buffer,
187                        gsize           count,
188                        GCancellable   *cancellable,
189                        GError        **error)
190 {
191   GOutputStreamClass *class;
192   gssize res;
193
194   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
195   g_return_val_if_fail (buffer != NULL, 0);
196
197   if (count == 0)
198     return 0;
199   
200   if (((gssize) count) < 0)
201     {
202       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
203                    _("Too large count value passed to %s"), G_STRFUNC);
204       return -1;
205     }
206
207   class = G_OUTPUT_STREAM_GET_CLASS (stream);
208
209   if (class->write_fn == NULL) 
210     {
211       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
212                            _("Output stream doesn't implement write"));
213       return -1;
214     }
215   
216   if (!g_output_stream_set_pending (stream, error))
217     return -1;
218   
219   if (cancellable)
220     g_cancellable_push_current (cancellable);
221   
222   res = class->write_fn (stream, buffer, count, cancellable, error);
223   
224   if (cancellable)
225     g_cancellable_pop_current (cancellable);
226   
227   g_output_stream_clear_pending (stream);
228
229   return res; 
230 }
231
232 /**
233  * g_output_stream_write_all:
234  * @stream: a #GOutputStream.
235  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
236  * @count: the number of bytes to write
237  * @bytes_written: (out): location to store the number of bytes that was 
238  *     written to the stream
239  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
240  * @error: location to store the error occurring, or %NULL to ignore
241  *
242  * Tries to write @count bytes from @buffer into the stream. Will block
243  * during the operation.
244  * 
245  * This function is similar to g_output_stream_write(), except it tries to
246  * write as many bytes as requested, only stopping on an error.
247  *
248  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
249  * is set to @count.
250  * 
251  * If there is an error during the operation %FALSE is returned and @error
252  * is set to indicate the error status, @bytes_written is updated to contain
253  * the number of bytes written into the stream before the error occurred.
254  *
255  * Return value: %TRUE on success, %FALSE if there was an error
256  **/
257 gboolean
258 g_output_stream_write_all (GOutputStream  *stream,
259                            const void     *buffer,
260                            gsize           count,
261                            gsize          *bytes_written,
262                            GCancellable   *cancellable,
263                            GError        **error)
264 {
265   gsize _bytes_written;
266   gssize res;
267
268   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
269   g_return_val_if_fail (buffer != NULL, FALSE);
270
271   _bytes_written = 0;
272   while (_bytes_written < count)
273     {
274       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
275                                    cancellable, error);
276       if (res == -1)
277         {
278           if (bytes_written)
279             *bytes_written = _bytes_written;
280           return FALSE;
281         }
282       
283       if (res == 0)
284         g_warning ("Write returned zero without error");
285
286       _bytes_written += res;
287     }
288   
289   if (bytes_written)
290     *bytes_written = _bytes_written;
291
292   return TRUE;
293 }
294
295 /**
296  * g_output_stream_write_bytes:
297  * @stream: a #GOutputStream.
298  * @bytes: the #GBytes to write
299  * @cancellable: (allow-none): optional cancellable object
300  * @error: location to store the error occurring, or %NULL to ignore
301  *
302  * Tries to write the data from @bytes into the stream. Will block
303  * during the operation.
304  *
305  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
306  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
307  *
308  * On success, the number of bytes written to the stream is returned.
309  * It is not an error if this is not the same as the requested size, as it
310  * can happen e.g. on a partial I/O error, or if there is not enough
311  * storage in the stream. All writes block until at least one byte
312  * is written or an error occurs; 0 is never returned (unless
313  * the size of @bytes is 0).
314  *
315  * If @cancellable is not %NULL, then the operation can be cancelled by
316  * triggering the cancellable object from another thread. If the operation
317  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
318  * operation was partially finished when the operation was cancelled the
319  * partial result will be returned, without an error.
320  *
321  * On error -1 is returned and @error is set accordingly.
322  *
323  * Return value: Number of bytes written, or -1 on error
324  **/
325 gssize
326 g_output_stream_write_bytes (GOutputStream  *stream,
327                              GBytes         *bytes,
328                              GCancellable   *cancellable,
329                              GError        **error)
330 {
331   gsize size;
332   gconstpointer data;
333
334   data = g_bytes_get_data (bytes, &size);
335
336   return g_output_stream_write (stream,
337                                 data, size,
338                                 cancellable,
339                                 error);
340 }
341
342 /**
343  * g_output_stream_flush:
344  * @stream: a #GOutputStream.
345  * @cancellable: (allow-none): optional cancellable object
346  * @error: location to store the error occurring, or %NULL to ignore
347  *
348  * Forces a write of all user-space buffered data for the given
349  * @stream. Will block during the operation. Closing the stream will
350  * implicitly cause a flush.
351  *
352  * This function is optional for inherited classes.
353  * 
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
357  *
358  * Return value: %TRUE on success, %FALSE on error
359  **/
360 gboolean
361 g_output_stream_flush (GOutputStream  *stream,
362                        GCancellable   *cancellable,
363                        GError        **error)
364 {
365   GOutputStreamClass *class;
366   gboolean res;
367
368   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
369
370   if (!g_output_stream_set_pending (stream, error))
371     return FALSE;
372   
373   class = G_OUTPUT_STREAM_GET_CLASS (stream);
374
375   res = TRUE;
376   if (class->flush)
377     {
378       if (cancellable)
379         g_cancellable_push_current (cancellable);
380       
381       res = class->flush (stream, cancellable, error);
382       
383       if (cancellable)
384         g_cancellable_pop_current (cancellable);
385     }
386   
387   g_output_stream_clear_pending (stream);
388
389   return res;
390 }
391
392 /**
393  * g_output_stream_splice:
394  * @stream: a #GOutputStream.
395  * @source: a #GInputStream.
396  * @flags: a set of #GOutputStreamSpliceFlags.
397  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
398  * @error: a #GError location to store the error occurring, or %NULL to
399  * ignore.
400  *
401  * Splices an input stream into an output stream.
402  *
403  * Returns: a #gssize containing the size of the data spliced, or
404  *     -1 if an error occurred. Note that if the number of bytes
405  *     spliced is greater than %G_MAXSSIZE, then that will be
406  *     returned, and there is no way to determine the actual number
407  *     of bytes spliced.
408  **/
409 gssize
410 g_output_stream_splice (GOutputStream             *stream,
411                         GInputStream              *source,
412                         GOutputStreamSpliceFlags   flags,
413                         GCancellable              *cancellable,
414                         GError                   **error)
415 {
416   GOutputStreamClass *class;
417   gssize bytes_copied;
418
419   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
420   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
421
422   if (g_input_stream_is_closed (source))
423     {
424       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
425                            _("Source stream is already closed"));
426       return -1;
427     }
428
429   if (!g_output_stream_set_pending (stream, error))
430     return -1;
431
432   class = G_OUTPUT_STREAM_GET_CLASS (stream);
433
434   if (cancellable)
435     g_cancellable_push_current (cancellable);
436
437   bytes_copied = class->splice (stream, source, flags, cancellable, error);
438
439   if (cancellable)
440     g_cancellable_pop_current (cancellable);
441
442   g_output_stream_clear_pending (stream);
443
444   return bytes_copied;
445 }
446
447 static gssize
448 g_output_stream_real_splice (GOutputStream             *stream,
449                              GInputStream              *source,
450                              GOutputStreamSpliceFlags   flags,
451                              GCancellable              *cancellable,
452                              GError                   **error)
453 {
454   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
455   gssize n_read, n_written;
456   gsize bytes_copied;
457   char buffer[8192], *p;
458   gboolean res;
459
460   bytes_copied = 0;
461   if (class->write_fn == NULL)
462     {
463       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
464                            _("Output stream doesn't implement write"));
465       res = FALSE;
466       goto notsupported;
467     }
468
469   res = TRUE;
470   do
471     {
472       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
473       if (n_read == -1)
474         {
475           res = FALSE;
476           break;
477         }
478
479       if (n_read == 0)
480         break;
481
482       p = buffer;
483       while (n_read > 0)
484         {
485           n_written = class->write_fn (stream, p, n_read, cancellable, error);
486           if (n_written == -1)
487             {
488               res = FALSE;
489               break;
490             }
491
492           p += n_written;
493           n_read -= n_written;
494           bytes_copied += n_written;
495         }
496
497       if (bytes_copied > G_MAXSSIZE)
498         bytes_copied = G_MAXSSIZE;
499     }
500   while (res);
501
502  notsupported:
503   if (!res)
504     error = NULL; /* Ignore further errors */
505
506   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
507     {
508       /* Don't care about errors in source here */
509       g_input_stream_close (source, cancellable, NULL);
510     }
511
512   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
513     {
514       /* But write errors on close are bad! */
515       res = _g_output_stream_close_internal (stream, cancellable, error);
516     }
517
518   if (res)
519     return bytes_copied;
520
521   return -1;
522 }
523
524 /* Must always be called inside
525  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
526 static gboolean
527 _g_output_stream_close_internal (GOutputStream  *stream,
528                                  GCancellable   *cancellable,
529                                  GError        **error)
530 {
531   GOutputStreamClass *class;
532   gboolean res;
533
534   if (stream->priv->closed)
535     return TRUE;
536
537   class = G_OUTPUT_STREAM_GET_CLASS (stream);
538
539   stream->priv->closing = TRUE;
540
541   if (cancellable)
542     g_cancellable_push_current (cancellable);
543
544   if (class->flush)
545     res = class->flush (stream, cancellable, error);
546   else
547     res = TRUE;
548
549   if (!res)
550     {
551       /* flushing caused the error that we want to return,
552        * but we still want to close the underlying stream if possible
553        */
554       if (class->close_fn)
555         class->close_fn (stream, cancellable, NULL);
556     }
557   else
558     {
559       res = TRUE;
560       if (class->close_fn)
561         res = class->close_fn (stream, cancellable, error);
562     }
563
564   if (cancellable)
565     g_cancellable_pop_current (cancellable);
566
567   stream->priv->closing = FALSE;
568   stream->priv->closed = TRUE;
569
570   return res;
571 }
572
573 /**
574  * g_output_stream_close:
575  * @stream: A #GOutputStream.
576  * @cancellable: (allow-none): optional cancellable object
577  * @error: location to store the error occurring, or %NULL to ignore
578  *
579  * Closes the stream, releasing resources related to it.
580  *
581  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
582  * Closing a stream multiple times will not return an error.
583  *
584  * Closing a stream will automatically flush any outstanding buffers in the
585  * stream.
586  *
587  * Streams will be automatically closed when the last reference
588  * is dropped, but you might want to call this function to make sure 
589  * resources are released as early as possible.
590  *
591  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
592  * open after the stream is closed. See the documentation for the individual
593  * stream for details.
594  *
595  * On failure the first error that happened will be reported, but the close
596  * operation will finish as much as possible. A stream that failed to
597  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
598  * is important to check and report the error to the user, otherwise
599  * there might be a loss of data as all data might not be written.
600  * 
601  * If @cancellable is not %NULL, then the operation can be cancelled by
602  * triggering the cancellable object from another thread. If the operation
603  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
604  * Cancelling a close will still leave the stream closed, but there some streams
605  * can use a faster close that doesn't block to e.g. check errors. On
606  * cancellation (as with any error) there is no guarantee that all written
607  * data will reach the target. 
608  *
609  * Return value: %TRUE on success, %FALSE on failure
610  **/
611 gboolean
612 g_output_stream_close (GOutputStream  *stream,
613                        GCancellable   *cancellable,
614                        GError        **error)
615 {
616   gboolean res;
617
618   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
619
620   if (stream->priv->closed)
621     return TRUE;
622
623   if (!g_output_stream_set_pending (stream, error))
624     return FALSE;
625
626   res = _g_output_stream_close_internal (stream, cancellable, error);
627
628   g_output_stream_clear_pending (stream);
629   
630   return res;
631 }
632
633 static void
634 async_ready_callback_wrapper (GObject      *source_object,
635                               GAsyncResult *res,
636                               gpointer      user_data)
637 {
638   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
639
640   g_output_stream_clear_pending (stream);
641   if (stream->priv->outstanding_callback)
642     (*stream->priv->outstanding_callback) (source_object, res, user_data);
643   g_object_unref (stream);
644 }
645
646 typedef struct {
647   gint io_priority;
648   GCancellable *cancellable;
649   GError *flush_error;
650   gpointer user_data;
651 } CloseUserData;
652
653 static void
654 async_ready_close_callback_wrapper (GObject      *source_object,
655                                     GAsyncResult *res,
656                                     gpointer      user_data)
657 {
658   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
659   CloseUserData *data = user_data;
660
661   stream->priv->closing = FALSE;
662   stream->priv->closed = TRUE;
663
664   g_output_stream_clear_pending (stream);
665
666   if (stream->priv->outstanding_callback)
667     {
668       if (data->flush_error != NULL)
669         {
670           GSimpleAsyncResult *err;
671
672           err = g_simple_async_result_new_take_error (source_object,
673                                                       stream->priv->outstanding_callback,
674                                                       data->user_data,
675                                                       data->flush_error);
676           data->flush_error = NULL;
677
678           (*stream->priv->outstanding_callback) (source_object,
679                                                  G_ASYNC_RESULT (err),
680                                                  data->user_data);
681           g_object_unref (err);
682         }
683       else
684         {
685           (*stream->priv->outstanding_callback) (source_object,
686                                                  res,
687                                                  data->user_data);
688         }
689     }
690
691   g_object_unref (stream);
692
693   if (data->cancellable)
694     g_object_unref (data->cancellable);
695
696   if (data->flush_error)
697     g_error_free (data->flush_error);
698
699   g_slice_free (CloseUserData, data);
700 }
701
702 static void
703 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
704                                             GAsyncResult *res,
705                                             gpointer      user_data)
706 {
707   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
708   GOutputStreamClass *class;
709   CloseUserData *data = user_data;
710   GSimpleAsyncResult *simple;
711
712   /* propagate the possible error */
713   if (G_IS_SIMPLE_ASYNC_RESULT (res))
714     {
715       simple = G_SIMPLE_ASYNC_RESULT (res);
716       g_simple_async_result_propagate_error (simple, &data->flush_error);
717     }
718
719   class = G_OUTPUT_STREAM_GET_CLASS (stream);
720
721   /* we still close, even if there was a flush error */
722   class->close_async (stream, data->io_priority, data->cancellable,
723                       async_ready_close_callback_wrapper, user_data);
724 }
725
726 /**
727  * g_output_stream_write_async:
728  * @stream: A #GOutputStream.
729  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
730  * @count: the number of bytes to write
731  * @io_priority: the io priority of the request.
732  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
733  * @callback: (scope async): callback to call when the request is satisfied
734  * @user_data: (closure): the data to pass to callback function
735  *
736  * Request an asynchronous write of @count bytes from @buffer into 
737  * the stream. When the operation is finished @callback will be called.
738  * You can then call g_output_stream_write_finish() to get the result of the 
739  * operation.
740  *
741  * During an async request no other sync and async calls are allowed, 
742  * and will result in %G_IO_ERROR_PENDING errors. 
743  *
744  * A value of @count larger than %G_MAXSSIZE will cause a 
745  * %G_IO_ERROR_INVALID_ARGUMENT error.
746  *
747  * On success, the number of bytes written will be passed to the
748  * @callback. It is not an error if this is not the same as the 
749  * requested size, as it can happen e.g. on a partial I/O error, 
750  * but generally we try to write as many bytes as requested. 
751  *
752  * You are guaranteed that this method will never fail with
753  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
754  * method will just wait until this changes.
755  *
756  * Any outstanding I/O request with higher priority (lower numerical 
757  * value) will be executed before an outstanding request with lower 
758  * priority. Default priority is %G_PRIORITY_DEFAULT.
759  *
760  * The asyncronous methods have a default fallback that uses threads 
761  * to implement asynchronicity, so they are optional for inheriting 
762  * classes. However, if you override one you must override all.
763  *
764  * For the synchronous, blocking version of this function, see 
765  * g_output_stream_write().
766  **/
767 void
768 g_output_stream_write_async (GOutputStream       *stream,
769                              const void          *buffer,
770                              gsize                count,
771                              int                  io_priority,
772                              GCancellable        *cancellable,
773                              GAsyncReadyCallback  callback,
774                              gpointer             user_data)
775 {
776   GOutputStreamClass *class;
777   GSimpleAsyncResult *simple;
778   GError *error = NULL;
779
780   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
781   g_return_if_fail (buffer != NULL);
782
783   if (count == 0)
784     {
785       simple = g_simple_async_result_new (G_OBJECT (stream),
786                                           callback,
787                                           user_data,
788                                           g_output_stream_write_async);
789       g_simple_async_result_complete_in_idle (simple);
790       g_object_unref (simple);
791       return;
792     }
793
794   if (((gssize) count) < 0)
795     {
796       g_simple_async_report_error_in_idle (G_OBJECT (stream),
797                                            callback,
798                                            user_data,
799                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
800                                            _("Too large count value passed to %s"),
801                                            G_STRFUNC);
802       return;
803     }
804
805   if (!g_output_stream_set_pending (stream, &error))
806     {
807       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
808                                             callback,
809                                             user_data,
810                                             error);
811       return;
812     }
813   
814   class = G_OUTPUT_STREAM_GET_CLASS (stream);
815
816   stream->priv->outstanding_callback = callback;
817   g_object_ref (stream);
818   class->write_async (stream, buffer, count, io_priority, cancellable,
819                       async_ready_callback_wrapper, user_data);
820 }
821
822 /**
823  * g_output_stream_write_finish:
824  * @stream: a #GOutputStream.
825  * @result: a #GAsyncResult.
826  * @error: a #GError location to store the error occurring, or %NULL to 
827  * ignore.
828  * 
829  * Finishes a stream write operation.
830  * 
831  * Returns: a #gssize containing the number of bytes written to the stream.
832  **/
833 gssize
834 g_output_stream_write_finish (GOutputStream  *stream,
835                               GAsyncResult   *result,
836                               GError        **error)
837 {
838   GSimpleAsyncResult *simple;
839   GOutputStreamClass *class;
840
841   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
842   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
843
844   if (g_async_result_legacy_propagate_error (result, error))
845     return -1;
846
847   if (G_IS_SIMPLE_ASYNC_RESULT (result))
848     {
849       simple = G_SIMPLE_ASYNC_RESULT (result);
850
851       /* Special case writes of 0 bytes */
852       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async)
853         return 0;
854     }
855   
856   class = G_OUTPUT_STREAM_GET_CLASS (stream);
857   return class->write_finish (stream, result, error);
858 }
859
860 static void
861 write_bytes_callback (GObject      *stream,
862                       GAsyncResult *result,
863                       gpointer      user_data)
864 {
865   GSimpleAsyncResult *simple = user_data;
866   GError *error = NULL;
867   gssize nwrote;
868
869   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
870                                          result, &error);
871   if (nwrote == -1)
872     g_simple_async_result_take_error (simple, error);
873   else
874     g_simple_async_result_set_op_res_gssize (simple, nwrote);
875   g_simple_async_result_complete (simple);
876   g_object_unref (simple);
877 }
878
879 /**
880  * g_output_stream_write_bytes_async:
881  * @stream: A #GOutputStream.
882  * @bytes: The bytes to write
883  * @io_priority: the io priority of the request.
884  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
885  * @callback: (scope async): callback to call when the request is satisfied
886  * @user_data: (closure): the data to pass to callback function
887  *
888  * Request an asynchronous write of the data in @bytes to the stream.
889  * When the operation is finished @callback will be called. You can
890  * then call g_output_stream_write_bytes_finish() to get the result of
891  * the operation.
892  *
893  * During an async request no other sync and async calls are allowed,
894  * and will result in %G_IO_ERROR_PENDING errors.
895  *
896  * A #GBytes larger than %G_MAXSSIZE will cause a
897  * %G_IO_ERROR_INVALID_ARGUMENT error.
898  *
899  * On success, the number of bytes written will be passed to the
900  * @callback. It is not an error if this is not the same as the
901  * requested size, as it can happen e.g. on a partial I/O error,
902  * but generally we try to write as many bytes as requested.
903  *
904  * You are guaranteed that this method will never fail with
905  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
906  * method will just wait until this changes.
907  *
908  * Any outstanding I/O request with higher priority (lower numerical
909  * value) will be executed before an outstanding request with lower
910  * priority. Default priority is %G_PRIORITY_DEFAULT.
911  *
912  * For the synchronous, blocking version of this function, see
913  * g_output_stream_write_bytes().
914  **/
915 void
916 g_output_stream_write_bytes_async (GOutputStream       *stream,
917                                    GBytes              *bytes,
918                                    int                  io_priority,
919                                    GCancellable        *cancellable,
920                                    GAsyncReadyCallback  callback,
921                                    gpointer             user_data)
922 {
923   GSimpleAsyncResult *simple;
924   gsize size;
925   gconstpointer data;
926
927   data = g_bytes_get_data (bytes, &size);
928
929   simple = g_simple_async_result_new (G_OBJECT (stream),
930                                       callback, user_data,
931                                       g_output_stream_write_bytes_async);
932   g_simple_async_result_set_op_res_gpointer (simple, g_bytes_ref (bytes),
933                                              (GDestroyNotify) g_bytes_unref);
934
935   g_output_stream_write_async (stream,
936                                data, size,
937                                io_priority,
938                                cancellable,
939                                write_bytes_callback,
940                                simple);
941 }
942
943 /**
944  * g_output_stream_write_bytes_finish:
945  * @stream: a #GOutputStream.
946  * @result: a #GAsyncResult.
947  * @error: a #GError location to store the error occurring, or %NULL to
948  * ignore.
949  *
950  * Finishes a stream write-from-#GBytes operation.
951  *
952  * Returns: a #gssize containing the number of bytes written to the stream.
953  **/
954 gssize
955 g_output_stream_write_bytes_finish (GOutputStream  *stream,
956                                     GAsyncResult   *result,
957                                     GError        **error)
958 {
959   GSimpleAsyncResult *simple;
960
961   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
962   g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (stream), g_output_stream_write_bytes_async), -1);
963
964   simple = G_SIMPLE_ASYNC_RESULT (result);
965   if (g_simple_async_result_propagate_error (simple, error))
966     return -1;
967   return g_simple_async_result_get_op_res_gssize (simple);
968 }
969
970 typedef struct {
971   GInputStream *source;
972   gpointer user_data;
973   GAsyncReadyCallback callback;
974 } SpliceUserData;
975
976 static void
977 async_ready_splice_callback_wrapper (GObject      *source_object,
978                                      GAsyncResult *res,
979                                      gpointer     _data)
980 {
981   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
982   SpliceUserData *data = _data;
983   
984   g_output_stream_clear_pending (stream);
985   
986   if (data->callback)
987     (*data->callback) (source_object, res, data->user_data);
988   
989   g_object_unref (stream);
990   g_object_unref (data->source);
991   g_free (data);
992 }
993
994 /**
995  * g_output_stream_splice_async:
996  * @stream: a #GOutputStream.
997  * @source: a #GInputStream. 
998  * @flags: a set of #GOutputStreamSpliceFlags.
999  * @io_priority: the io priority of the request.
1000  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
1001  * @callback: (scope async): a #GAsyncReadyCallback. 
1002  * @user_data: (closure): user data passed to @callback.
1003  * 
1004  * Splices a stream asynchronously.
1005  * When the operation is finished @callback will be called.
1006  * You can then call g_output_stream_splice_finish() to get the 
1007  * result of the operation.
1008  *
1009  * For the synchronous, blocking version of this function, see 
1010  * g_output_stream_splice().
1011  **/
1012 void
1013 g_output_stream_splice_async (GOutputStream            *stream,
1014                               GInputStream             *source,
1015                               GOutputStreamSpliceFlags  flags,
1016                               int                       io_priority,
1017                               GCancellable             *cancellable,
1018                               GAsyncReadyCallback       callback,
1019                               gpointer                  user_data)
1020 {
1021   GOutputStreamClass *class;
1022   SpliceUserData *data;
1023   GError *error = NULL;
1024
1025   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1026   g_return_if_fail (G_IS_INPUT_STREAM (source));
1027
1028   if (g_input_stream_is_closed (source))
1029     {
1030       g_simple_async_report_error_in_idle (G_OBJECT (stream),
1031                                            callback,
1032                                            user_data,
1033                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
1034                                            _("Source stream is already closed"));
1035       return;
1036     }
1037   
1038   if (!g_output_stream_set_pending (stream, &error))
1039     {
1040       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1041                                             callback,
1042                                             user_data,
1043                                             error);
1044       return;
1045     }
1046
1047   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1048
1049   data = g_new0 (SpliceUserData, 1);
1050   data->callback = callback;
1051   data->user_data = user_data;
1052   data->source = g_object_ref (source);
1053   
1054   g_object_ref (stream);
1055   class->splice_async (stream, source, flags, io_priority, cancellable,
1056                       async_ready_splice_callback_wrapper, data);
1057 }
1058
1059 /**
1060  * g_output_stream_splice_finish:
1061  * @stream: a #GOutputStream.
1062  * @result: a #GAsyncResult.
1063  * @error: a #GError location to store the error occurring, or %NULL to 
1064  * ignore.
1065  *
1066  * Finishes an asynchronous stream splice operation.
1067  * 
1068  * Returns: a #gssize of the number of bytes spliced. Note that if the
1069  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1070  *     will be returned, and there is no way to determine the actual
1071  *     number of bytes spliced.
1072  **/
1073 gssize
1074 g_output_stream_splice_finish (GOutputStream  *stream,
1075                                GAsyncResult   *result,
1076                                GError        **error)
1077 {
1078   GOutputStreamClass *class;
1079
1080   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
1081   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
1082
1083   if (g_async_result_legacy_propagate_error (result, error))
1084     return -1;
1085   
1086   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1087   return class->splice_finish (stream, result, error);
1088 }
1089
1090 /**
1091  * g_output_stream_flush_async:
1092  * @stream: a #GOutputStream.
1093  * @io_priority: the io priority of the request.
1094  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1095  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1096  * @user_data: (closure): the data to pass to callback function
1097  * 
1098  * Forces an asynchronous write of all user-space buffered data for
1099  * the given @stream.
1100  * For behaviour details see g_output_stream_flush().
1101  *
1102  * When the operation is finished @callback will be 
1103  * called. You can then call g_output_stream_flush_finish() to get the 
1104  * result of the operation.
1105  **/
1106 void
1107 g_output_stream_flush_async (GOutputStream       *stream,
1108                              int                  io_priority,
1109                              GCancellable        *cancellable,
1110                              GAsyncReadyCallback  callback,
1111                              gpointer             user_data)
1112 {
1113   GOutputStreamClass *class;
1114   GSimpleAsyncResult *simple;
1115   GError *error = NULL;
1116
1117   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1118
1119   if (!g_output_stream_set_pending (stream, &error))
1120     {
1121       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1122                                             callback,
1123                                             user_data,
1124                                             error);
1125       return;
1126     }
1127
1128   stream->priv->outstanding_callback = callback;
1129   g_object_ref (stream);
1130
1131   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1132   
1133   if (class->flush_async == NULL)
1134     {
1135       simple = g_simple_async_result_new (G_OBJECT (stream),
1136                                           async_ready_callback_wrapper,
1137                                           user_data,
1138                                           g_output_stream_flush_async);
1139       g_simple_async_result_complete_in_idle (simple);
1140       g_object_unref (simple);
1141       return;
1142     }
1143       
1144   class->flush_async (stream, io_priority, cancellable,
1145                       async_ready_callback_wrapper, user_data);
1146 }
1147
1148 /**
1149  * g_output_stream_flush_finish:
1150  * @stream: a #GOutputStream.
1151  * @result: a GAsyncResult.
1152  * @error: a #GError location to store the error occurring, or %NULL to 
1153  * ignore.
1154  * 
1155  * Finishes flushing an output stream.
1156  * 
1157  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1158  **/
1159 gboolean
1160 g_output_stream_flush_finish (GOutputStream  *stream,
1161                               GAsyncResult   *result,
1162                               GError        **error)
1163 {
1164   GSimpleAsyncResult *simple;
1165   GOutputStreamClass *klass;
1166
1167   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1168   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1169
1170   if (g_async_result_legacy_propagate_error (result, error))
1171     return FALSE;
1172
1173   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1174     {
1175       simple = G_SIMPLE_ASYNC_RESULT (result);
1176
1177       /* Special case default implementation */
1178       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async)
1179         return TRUE;
1180     }
1181
1182   klass = G_OUTPUT_STREAM_GET_CLASS (stream);
1183   return klass->flush_finish (stream, result, error);
1184 }
1185
1186
1187 /**
1188  * g_output_stream_close_async:
1189  * @stream: A #GOutputStream.
1190  * @io_priority: the io priority of the request.
1191  * @cancellable: (allow-none): optional cancellable object
1192  * @callback: (scope async): callback to call when the request is satisfied
1193  * @user_data: (closure): the data to pass to callback function
1194  *
1195  * Requests an asynchronous close of the stream, releasing resources 
1196  * related to it. When the operation is finished @callback will be 
1197  * called. You can then call g_output_stream_close_finish() to get 
1198  * the result of the operation.
1199  *
1200  * For behaviour details see g_output_stream_close().
1201  *
1202  * The asyncronous methods have a default fallback that uses threads 
1203  * to implement asynchronicity, so they are optional for inheriting 
1204  * classes. However, if you override one you must override all.
1205  **/
1206 void
1207 g_output_stream_close_async (GOutputStream       *stream,
1208                              int                  io_priority,
1209                              GCancellable        *cancellable,
1210                              GAsyncReadyCallback  callback,
1211                              gpointer             user_data)
1212 {
1213   GOutputStreamClass *class;
1214   GSimpleAsyncResult *simple;
1215   GError *error = NULL;
1216   CloseUserData *data;
1217
1218   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1219   
1220   if (stream->priv->closed)
1221     {
1222       simple = g_simple_async_result_new (G_OBJECT (stream),
1223                                           callback,
1224                                           user_data,
1225                                           g_output_stream_close_async);
1226       g_simple_async_result_complete_in_idle (simple);
1227       g_object_unref (simple);
1228       return;
1229     }
1230
1231   if (!g_output_stream_set_pending (stream, &error))
1232     {
1233       g_simple_async_report_take_gerror_in_idle (G_OBJECT (stream),
1234                                             callback,
1235                                             user_data,
1236                                             error);
1237       return;
1238     }
1239   
1240   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1241   stream->priv->closing = TRUE;
1242   stream->priv->outstanding_callback = callback;
1243   g_object_ref (stream);
1244
1245   data = g_slice_new0 (CloseUserData);
1246
1247   if (cancellable != NULL)
1248     data->cancellable = g_object_ref (cancellable);
1249
1250   data->io_priority = io_priority;
1251   data->user_data = user_data;
1252
1253   /* Call close_async directly if there is no need to flush, or if the flush
1254      can be done sync (in the output stream async close thread) */
1255   if (class->flush_async == NULL ||
1256       (class->flush_async == g_output_stream_real_flush_async &&
1257        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1258     {
1259       class->close_async (stream, io_priority, cancellable,
1260                           async_ready_close_callback_wrapper, data);
1261     }
1262   else
1263     {
1264       /* First do an async flush, then do the async close in the callback
1265          wrapper (see async_ready_close_flushed_callback_wrapper) */
1266       class->flush_async (stream, io_priority, cancellable,
1267                           async_ready_close_flushed_callback_wrapper, data);
1268     }
1269 }
1270
1271 /**
1272  * g_output_stream_close_finish:
1273  * @stream: a #GOutputStream.
1274  * @result: a #GAsyncResult.
1275  * @error: a #GError location to store the error occurring, or %NULL to 
1276  * ignore.
1277  * 
1278  * Closes an output stream.
1279  * 
1280  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1281  **/
1282 gboolean
1283 g_output_stream_close_finish (GOutputStream  *stream,
1284                               GAsyncResult   *result,
1285                               GError        **error)
1286 {
1287   GSimpleAsyncResult *simple;
1288   GOutputStreamClass *class;
1289
1290   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1291   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1292
1293   if (g_async_result_legacy_propagate_error (result, error))
1294     return FALSE;
1295
1296   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1297     {
1298       simple = G_SIMPLE_ASYNC_RESULT (result);
1299
1300       /* Special case already closed */
1301       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async)
1302         return TRUE;
1303     }
1304
1305   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1306   return class->close_finish (stream, result, error);
1307 }
1308
1309 /**
1310  * g_output_stream_is_closed:
1311  * @stream: a #GOutputStream.
1312  * 
1313  * Checks if an output stream has already been closed.
1314  * 
1315  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1316  **/
1317 gboolean
1318 g_output_stream_is_closed (GOutputStream *stream)
1319 {
1320   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1321   
1322   return stream->priv->closed;
1323 }
1324
1325 /**
1326  * g_output_stream_is_closing:
1327  * @stream: a #GOutputStream.
1328  *
1329  * Checks if an output stream is being closed. This can be
1330  * used inside e.g. a flush implementation to see if the
1331  * flush (or other i/o operation) is called from within
1332  * the closing operation.
1333  *
1334  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1335  *
1336  * Since: 2.24
1337  **/
1338 gboolean
1339 g_output_stream_is_closing (GOutputStream *stream)
1340 {
1341   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1342
1343   return stream->priv->closing;
1344 }
1345
1346 /**
1347  * g_output_stream_has_pending:
1348  * @stream: a #GOutputStream.
1349  * 
1350  * Checks if an ouput stream has pending actions.
1351  * 
1352  * Returns: %TRUE if @stream has pending actions. 
1353  **/
1354 gboolean
1355 g_output_stream_has_pending (GOutputStream *stream)
1356 {
1357   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1358   
1359   return stream->priv->pending;
1360 }
1361
1362 /**
1363  * g_output_stream_set_pending:
1364  * @stream: a #GOutputStream.
1365  * @error: a #GError location to store the error occurring, or %NULL to 
1366  * ignore.
1367  * 
1368  * Sets @stream to have actions pending. If the pending flag is
1369  * already set or @stream is closed, it will return %FALSE and set
1370  * @error.
1371  *
1372  * Return value: %TRUE if pending was previously unset and is now set.
1373  **/
1374 gboolean
1375 g_output_stream_set_pending (GOutputStream *stream,
1376                              GError **error)
1377 {
1378   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1379   
1380   if (stream->priv->closed)
1381     {
1382       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1383                            _("Stream is already closed"));
1384       return FALSE;
1385     }
1386   
1387   if (stream->priv->pending)
1388     {
1389       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1390                            /* Translators: This is an error you get if there is
1391                             * already an operation running against this stream when
1392                             * you try to start one */
1393                            _("Stream has outstanding operation"));
1394       return FALSE;
1395     }
1396   
1397   stream->priv->pending = TRUE;
1398   return TRUE;
1399 }
1400
1401 /**
1402  * g_output_stream_clear_pending:
1403  * @stream: output stream
1404  * 
1405  * Clears the pending flag on @stream.
1406  **/
1407 void
1408 g_output_stream_clear_pending (GOutputStream *stream)
1409 {
1410   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1411   
1412   stream->priv->pending = FALSE;
1413 }
1414
1415
1416 /********************************************
1417  *   Default implementation of async ops    *
1418  ********************************************/
1419
1420 typedef struct {
1421   const void         *buffer;
1422   gsize               count_requested;
1423   gssize              count_written;
1424
1425   GCancellable       *cancellable;
1426   gint                io_priority;
1427   gboolean            need_idle;
1428 } WriteData;
1429
1430 static void
1431 free_write_data (WriteData *op)
1432 {
1433   if (op->cancellable)
1434     g_object_unref (op->cancellable);
1435   g_slice_free (WriteData, op);
1436 }
1437
1438 static void
1439 write_async_thread (GSimpleAsyncResult *res,
1440                     GObject            *object,
1441                     GCancellable       *cancellable)
1442 {
1443   WriteData *op;
1444   GOutputStreamClass *class;
1445   GError *error = NULL;
1446
1447   class = G_OUTPUT_STREAM_GET_CLASS (object);
1448   op = g_simple_async_result_get_op_res_gpointer (res);
1449   op->count_written = class->write_fn (G_OUTPUT_STREAM (object), op->buffer, op->count_requested,
1450                                        cancellable, &error);
1451   if (op->count_written == -1)
1452     g_simple_async_result_take_error (res, error);
1453 }
1454
1455 static void write_async_pollable (GPollableOutputStream *stream,
1456                                   GSimpleAsyncResult    *result);
1457
1458 static gboolean
1459 write_async_pollable_ready (GPollableOutputStream *stream,
1460                             gpointer               user_data)
1461 {
1462   GSimpleAsyncResult *result = user_data;
1463
1464   write_async_pollable (stream, result);
1465   return FALSE;
1466 }
1467
1468 static void
1469 write_async_pollable (GPollableOutputStream *stream,
1470                       GSimpleAsyncResult    *result)
1471 {
1472   GError *error = NULL;
1473   WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
1474
1475   if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
1476     op->count_written = -1;
1477   else
1478     {
1479       op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1480         write_nonblocking (stream, op->buffer, op->count_requested, &error);
1481     }
1482
1483   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1484     {
1485       GSource *source;
1486
1487       g_error_free (error);
1488       op->need_idle = FALSE;
1489
1490       source = g_pollable_output_stream_create_source (stream, op->cancellable);
1491       g_source_set_callback (source,
1492                              (GSourceFunc) write_async_pollable_ready,
1493                              g_object_ref (result), g_object_unref);
1494       g_source_set_priority (source, op->io_priority);
1495       g_source_attach (source, g_main_context_get_thread_default ());
1496       g_source_unref (source);
1497       return;
1498     }
1499
1500   if (op->count_written == -1)
1501     g_simple_async_result_take_error (result, error);
1502
1503   if (op->need_idle)
1504     g_simple_async_result_complete_in_idle (result);
1505   else
1506     g_simple_async_result_complete (result);
1507 }
1508
1509 static void
1510 g_output_stream_real_write_async (GOutputStream       *stream,
1511                                   const void          *buffer,
1512                                   gsize                count,
1513                                   int                  io_priority,
1514                                   GCancellable        *cancellable,
1515                                   GAsyncReadyCallback  callback,
1516                                   gpointer             user_data)
1517 {
1518   GSimpleAsyncResult *res;
1519   WriteData *op;
1520
1521   op = g_slice_new0 (WriteData);
1522   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1523   g_simple_async_result_set_op_res_gpointer (res, op, (GDestroyNotify) free_write_data);
1524   op->buffer = buffer;
1525   op->count_requested = count;
1526   op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
1527   op->io_priority = io_priority;
1528   op->need_idle = TRUE;
1529   
1530   if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1531       g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1532     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
1533   else
1534     g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
1535   g_object_unref (res);
1536 }
1537
1538 static gssize
1539 g_output_stream_real_write_finish (GOutputStream  *stream,
1540                                    GAsyncResult   *result,
1541                                    GError        **error)
1542 {
1543   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1544   WriteData *op;
1545
1546   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async);
1547
1548   if (g_simple_async_result_propagate_error (simple, error))
1549     return -1;
1550
1551   op = g_simple_async_result_get_op_res_gpointer (simple);
1552   return op->count_written;
1553 }
1554
1555 typedef struct {
1556   GInputStream *source;
1557   GOutputStreamSpliceFlags flags;
1558   gssize bytes_copied;
1559 } SpliceData;
1560
1561 static void
1562 splice_async_thread (GSimpleAsyncResult *result,
1563                      GObject            *object,
1564                      GCancellable       *cancellable)
1565 {
1566   SpliceData *op;
1567   GOutputStreamClass *class;
1568   GError *error = NULL;
1569   GOutputStream *stream;
1570
1571   stream = G_OUTPUT_STREAM (object);
1572   class = G_OUTPUT_STREAM_GET_CLASS (object);
1573   op = g_simple_async_result_get_op_res_gpointer (result);
1574   
1575   op->bytes_copied = class->splice (stream,
1576                                     op->source,
1577                                     op->flags,
1578                                     cancellable,
1579                                     &error);
1580   if (op->bytes_copied == -1)
1581     g_simple_async_result_take_error (result, error);
1582 }
1583
1584 static void
1585 g_output_stream_real_splice_async (GOutputStream             *stream,
1586                                    GInputStream              *source,
1587                                    GOutputStreamSpliceFlags   flags,
1588                                    int                        io_priority,
1589                                    GCancellable              *cancellable,
1590                                    GAsyncReadyCallback        callback,
1591                                    gpointer                   user_data)
1592 {
1593   GSimpleAsyncResult *res;
1594   SpliceData *op;
1595
1596   op = g_new0 (SpliceData, 1);
1597   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async);
1598   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1599   op->flags = flags;
1600   op->source = source;
1601
1602   /* TODO: In the case where both source and destintion have
1603      non-threadbased async calls we can use a true async copy here */
1604   
1605   g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable);
1606   g_object_unref (res);
1607 }
1608
1609 static gssize
1610 g_output_stream_real_splice_finish (GOutputStream  *stream,
1611                                     GAsyncResult   *result,
1612                                     GError        **error)
1613 {
1614   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1615   SpliceData *op;
1616
1617   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async);
1618
1619   if (g_simple_async_result_propagate_error (simple, error))
1620     return -1;
1621
1622   op = g_simple_async_result_get_op_res_gpointer (simple);
1623   return op->bytes_copied;
1624 }
1625
1626
1627 static void
1628 flush_async_thread (GSimpleAsyncResult *res,
1629                     GObject            *object,
1630                     GCancellable       *cancellable)
1631 {
1632   GOutputStreamClass *class;
1633   gboolean result;
1634   GError *error = NULL;
1635
1636   class = G_OUTPUT_STREAM_GET_CLASS (object);
1637   result = TRUE;
1638   if (class->flush)
1639     result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1640
1641   if (!result)
1642     g_simple_async_result_take_error (res, error);
1643 }
1644
1645 static void
1646 g_output_stream_real_flush_async (GOutputStream       *stream,
1647                                   int                  io_priority,
1648                                   GCancellable        *cancellable,
1649                                   GAsyncReadyCallback  callback,
1650                                   gpointer             user_data)
1651 {
1652   GSimpleAsyncResult *res;
1653
1654   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1655   
1656   g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable);
1657   g_object_unref (res);
1658 }
1659
1660 static gboolean
1661 g_output_stream_real_flush_finish (GOutputStream  *stream,
1662                                    GAsyncResult   *result,
1663                                    GError        **error)
1664 {
1665   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1666
1667   if (g_simple_async_result_propagate_error (simple, error))
1668     return FALSE;
1669   return TRUE;
1670 }
1671
1672 static void
1673 close_async_thread (GSimpleAsyncResult *res,
1674                     GObject            *object,
1675                     GCancellable       *cancellable)
1676 {
1677   GOutputStreamClass *class;
1678   GError *error = NULL;
1679   gboolean result = TRUE;
1680
1681   class = G_OUTPUT_STREAM_GET_CLASS (object);
1682
1683   /* Do a flush here if there is a flush function, and we did not have to do
1684      an async flush before (see g_output_stream_close_async) */
1685   if (class->flush != NULL &&
1686       (class->flush_async == NULL ||
1687        class->flush_async == g_output_stream_real_flush_async))
1688     {
1689       result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1690     }
1691
1692   /* Auto handling of cancelation disabled, and ignore
1693      cancellation, since we want to close things anyway, although
1694      possibly in a quick-n-dirty way. At least we never want to leak
1695      open handles */
1696   
1697   if (class->close_fn)
1698     {
1699       /* Make sure to close, even if the flush failed (see sync close) */
1700       if (!result)
1701         class->close_fn (G_OUTPUT_STREAM (object), cancellable, NULL);
1702       else
1703         result = class->close_fn (G_OUTPUT_STREAM (object), cancellable, &error);
1704
1705       if (!result)
1706         g_simple_async_result_take_error (res, error);
1707     }
1708 }
1709
1710 static void
1711 g_output_stream_real_close_async (GOutputStream       *stream,
1712                                   int                  io_priority,
1713                                   GCancellable        *cancellable,
1714                                   GAsyncReadyCallback  callback,
1715                                   gpointer             user_data)
1716 {
1717   GSimpleAsyncResult *res;
1718   
1719   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async);
1720
1721   g_simple_async_result_set_handle_cancellation (res, FALSE);
1722   
1723   g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable);
1724   g_object_unref (res);
1725 }
1726
1727 static gboolean
1728 g_output_stream_real_close_finish (GOutputStream  *stream,
1729                                    GAsyncResult   *result,
1730                                    GError        **error)
1731 {
1732   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1733
1734   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async);
1735
1736   if (g_simple_async_result_propagate_error (simple, error))
1737     return FALSE;
1738   return TRUE;
1739 }