Fix minor mem leak in test case
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <string.h>
23 #include "goutputstream.h"
24 #include "gcancellable.h"
25 #include "gasyncresult.h"
26 #include "gtask.h"
27 #include "ginputstream.h"
28 #include "gioerror.h"
29 #include "gioprivate.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GOutputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   guint closing : 1;
52   GAsyncReadyCallback outstanding_callback;
53 };
54
55 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
102                                                       int                      io_priority,
103                                                       GCancellable            *cancellable,
104                                                       GAsyncReadyCallback      callback,
105                                                       gpointer                 data);
106 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
107                                                        GAsyncResult           *result,
108                                                        GError                **error);
109
110 static void
111 g_output_stream_dispose (GObject *object)
112 {
113   GOutputStream *stream;
114
115   stream = G_OUTPUT_STREAM (object);
116   
117   if (!stream->priv->closed)
118     g_output_stream_close (stream, NULL, NULL);
119
120   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
121 }
122
123 static void
124 g_output_stream_class_init (GOutputStreamClass *klass)
125 {
126   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
127
128   gobject_class->dispose = g_output_stream_dispose;
129
130   klass->splice = g_output_stream_real_splice;
131   
132   klass->write_async = g_output_stream_real_write_async;
133   klass->write_finish = g_output_stream_real_write_finish;
134   klass->splice_async = g_output_stream_real_splice_async;
135   klass->splice_finish = g_output_stream_real_splice_finish;
136   klass->flush_async = g_output_stream_real_flush_async;
137   klass->flush_finish = g_output_stream_real_flush_finish;
138   klass->close_async = g_output_stream_real_close_async;
139   klass->close_finish = g_output_stream_real_close_finish;
140 }
141
142 static void
143 g_output_stream_init (GOutputStream *stream)
144 {
145   stream->priv = g_output_stream_get_instance_private (stream);
146 }
147
148 /**
149  * g_output_stream_write:
150  * @stream: a #GOutputStream.
151  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
152  * @count: the number of bytes to write
153  * @cancellable: (allow-none): optional cancellable object
154  * @error: location to store the error occurring, or %NULL to ignore
155  *
156  * Tries to write @count bytes from @buffer into the stream. Will block
157  * during the operation.
158  * 
159  * If count is 0, returns 0 and does nothing. A value of @count
160  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
161  *
162  * On success, the number of bytes written to the stream is returned.
163  * It is not an error if this is not the same as the requested size, as it
164  * can happen e.g. on a partial I/O error, or if there is not enough
165  * storage in the stream. All writes block until at least one byte
166  * is written or an error occurs; 0 is never returned (unless
167  * @count is 0).
168  * 
169  * If @cancellable is not %NULL, then the operation can be cancelled by
170  * triggering the cancellable object from another thread. If the operation
171  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
172  * operation was partially finished when the operation was cancelled the
173  * partial result will be returned, without an error.
174  *
175  * On error -1 is returned and @error is set accordingly.
176  * 
177  * Virtual: write_fn
178  *
179  * Returns: Number of bytes written, or -1 on error
180  **/
181 gssize
182 g_output_stream_write (GOutputStream  *stream,
183                        const void     *buffer,
184                        gsize           count,
185                        GCancellable   *cancellable,
186                        GError        **error)
187 {
188   GOutputStreamClass *class;
189   gssize res;
190
191   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
192   g_return_val_if_fail (buffer != NULL, 0);
193
194   if (count == 0)
195     return 0;
196   
197   if (((gssize) count) < 0)
198     {
199       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
200                    _("Too large count value passed to %s"), G_STRFUNC);
201       return -1;
202     }
203
204   class = G_OUTPUT_STREAM_GET_CLASS (stream);
205
206   if (class->write_fn == NULL) 
207     {
208       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
209                            _("Output stream doesn't implement write"));
210       return -1;
211     }
212   
213   if (!g_output_stream_set_pending (stream, error))
214     return -1;
215   
216   if (cancellable)
217     g_cancellable_push_current (cancellable);
218   
219   res = class->write_fn (stream, buffer, count, cancellable, error);
220   
221   if (cancellable)
222     g_cancellable_pop_current (cancellable);
223   
224   g_output_stream_clear_pending (stream);
225
226   return res; 
227 }
228
229 /**
230  * g_output_stream_write_all:
231  * @stream: a #GOutputStream.
232  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
233  * @count: the number of bytes to write
234  * @bytes_written: (out): location to store the number of bytes that was 
235  *     written to the stream
236  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
237  * @error: location to store the error occurring, or %NULL to ignore
238  *
239  * Tries to write @count bytes from @buffer into the stream. Will block
240  * during the operation.
241  * 
242  * This function is similar to g_output_stream_write(), except it tries to
243  * write as many bytes as requested, only stopping on an error.
244  *
245  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
246  * is set to @count.
247  * 
248  * If there is an error during the operation %FALSE is returned and @error
249  * is set to indicate the error status, @bytes_written is updated to contain
250  * the number of bytes written into the stream before the error occurred.
251  *
252  * Returns: %TRUE on success, %FALSE if there was an error
253  **/
254 gboolean
255 g_output_stream_write_all (GOutputStream  *stream,
256                            const void     *buffer,
257                            gsize           count,
258                            gsize          *bytes_written,
259                            GCancellable   *cancellable,
260                            GError        **error)
261 {
262   gsize _bytes_written;
263   gssize res;
264
265   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
266   g_return_val_if_fail (buffer != NULL, FALSE);
267
268   _bytes_written = 0;
269   while (_bytes_written < count)
270     {
271       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
272                                    cancellable, error);
273       if (res == -1)
274         {
275           if (bytes_written)
276             *bytes_written = _bytes_written;
277           return FALSE;
278         }
279       
280       if (res == 0)
281         g_warning ("Write returned zero without error");
282
283       _bytes_written += res;
284     }
285   
286   if (bytes_written)
287     *bytes_written = _bytes_written;
288
289   return TRUE;
290 }
291
292 /**
293  * g_output_stream_printf:
294  * @stream: a #GOutputStream.
295  * @bytes_written: (out): location to store the number of bytes that was
296  *     written to the stream
297  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
298  * @error: location to store the error occurring, or %NULL to ignore
299  * @format: the format string. See the printf() documentation
300  * @...: the parameters to insert into the format string
301  *
302  * This is a utility function around g_output_stream_write_all(). It
303  * uses g_strdup_vprintf() to turn @format and @... into a string that
304  * is then written to @stream.
305  *
306  * See the documentation of g_output_stream_write_all() about the
307  * behavior of the actual write operation.
308  *
309  * Note that partial writes cannot be properly checked with this
310  * function due to the variable length of the written string, if you
311  * need precise control over partial write failures, you need to
312  * create you own printf()-like wrapper around g_output_stream_write()
313  * or g_output_stream_write_all().
314  *
315  * Since: 2.40
316  *
317  * Returns: %TRUE on success, %FALSE if there was an error
318  **/
319 gboolean
320 g_output_stream_printf (GOutputStream  *stream,
321                         gsize          *bytes_written,
322                         GCancellable   *cancellable,
323                         GError        **error,
324                         const gchar    *format,
325                         ...)
326 {
327   va_list  args;
328   gboolean success;
329
330   va_start (args, format);
331   success = g_output_stream_vprintf (stream, bytes_written, cancellable,
332                                      error, format, args);
333   va_end (args);
334
335   return success;
336 }
337
338 /**
339  * g_output_stream_vprintf:
340  * @stream: a #GOutputStream.
341  * @bytes_written: (out): location to store the number of bytes that was
342  *     written to the stream
343  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
344  * @error: location to store the error occurring, or %NULL to ignore
345  * @format: the format string. See the printf() documentation
346  * @args: the parameters to insert into the format string
347  *
348  * This is a utility function around g_output_stream_write_all(). It
349  * uses g_strdup_vprintf() to turn @format and @args into a string that
350  * is then written to @stream.
351  *
352  * See the documentation of g_output_stream_write_all() about the
353  * behavior of the actual write operation.
354  *
355  * Note that partial writes cannot be properly checked with this
356  * function due to the variable length of the written string, if you
357  * need precise control over partial write failures, you need to
358  * create you own printf()-like wrapper around g_output_stream_write()
359  * or g_output_stream_write_all().
360  *
361  * Since: 2.40
362  *
363  * Returns: %TRUE on success, %FALSE if there was an error
364  **/
365 gboolean
366 g_output_stream_vprintf (GOutputStream  *stream,
367                          gsize          *bytes_written,
368                          GCancellable   *cancellable,
369                          GError        **error,
370                          const gchar    *format,
371                          va_list         args)
372 {
373   gchar    *text;
374   gboolean  success;
375
376   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
377   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (stream), FALSE);
378   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
379   g_return_val_if_fail (format != NULL, FALSE);
380
381   text = g_strdup_vprintf (format, args);
382   success = g_output_stream_write_all (stream,
383                                        text, strlen (text),
384                                        bytes_written, cancellable, error);
385   g_free (text);
386
387   return success;
388 }
389
390 /**
391  * g_output_stream_write_bytes:
392  * @stream: a #GOutputStream.
393  * @bytes: the #GBytes to write
394  * @cancellable: (allow-none): optional cancellable object
395  * @error: location to store the error occurring, or %NULL to ignore
396  *
397  * A wrapper function for g_output_stream_write() which takes a
398  * #GBytes as input.  This can be more convenient for use by language
399  * bindings or in other cases where the refcounted nature of #GBytes
400  * is helpful over a bare pointer interface.
401  *
402  * However, note that this function may still perform partial writes,
403  * just like g_output_stream_write().  If that occurs, to continue
404  * writing, you will need to create a new #GBytes containing just the
405  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
406  * #GBytes instance multiple times potentially can result in duplicated
407  * data in the output stream.
408  *
409  * Returns: Number of bytes written, or -1 on error
410  **/
411 gssize
412 g_output_stream_write_bytes (GOutputStream  *stream,
413                              GBytes         *bytes,
414                              GCancellable   *cancellable,
415                              GError        **error)
416 {
417   gsize size;
418   gconstpointer data;
419
420   data = g_bytes_get_data (bytes, &size);
421
422   return g_output_stream_write (stream,
423                                 data, size,
424                                 cancellable,
425                                 error);
426 }
427
428 /**
429  * g_output_stream_flush:
430  * @stream: a #GOutputStream.
431  * @cancellable: (allow-none): optional cancellable object
432  * @error: location to store the error occurring, or %NULL to ignore
433  *
434  * Forces a write of all user-space buffered data for the given
435  * @stream. Will block during the operation. Closing the stream will
436  * implicitly cause a flush.
437  *
438  * This function is optional for inherited classes.
439  * 
440  * If @cancellable is not %NULL, then the operation can be cancelled by
441  * triggering the cancellable object from another thread. If the operation
442  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
443  *
444  * Returns: %TRUE on success, %FALSE on error
445  **/
446 gboolean
447 g_output_stream_flush (GOutputStream  *stream,
448                        GCancellable   *cancellable,
449                        GError        **error)
450 {
451   GOutputStreamClass *class;
452   gboolean res;
453
454   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
455
456   if (!g_output_stream_set_pending (stream, error))
457     return FALSE;
458   
459   class = G_OUTPUT_STREAM_GET_CLASS (stream);
460
461   res = TRUE;
462   if (class->flush)
463     {
464       if (cancellable)
465         g_cancellable_push_current (cancellable);
466       
467       res = class->flush (stream, cancellable, error);
468       
469       if (cancellable)
470         g_cancellable_pop_current (cancellable);
471     }
472   
473   g_output_stream_clear_pending (stream);
474
475   return res;
476 }
477
478 /**
479  * g_output_stream_splice:
480  * @stream: a #GOutputStream.
481  * @source: a #GInputStream.
482  * @flags: a set of #GOutputStreamSpliceFlags.
483  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
484  * @error: a #GError location to store the error occurring, or %NULL to
485  * ignore.
486  *
487  * Splices an input stream into an output stream.
488  *
489  * Returns: a #gssize containing the size of the data spliced, or
490  *     -1 if an error occurred. Note that if the number of bytes
491  *     spliced is greater than %G_MAXSSIZE, then that will be
492  *     returned, and there is no way to determine the actual number
493  *     of bytes spliced.
494  **/
495 gssize
496 g_output_stream_splice (GOutputStream             *stream,
497                         GInputStream              *source,
498                         GOutputStreamSpliceFlags   flags,
499                         GCancellable              *cancellable,
500                         GError                   **error)
501 {
502   GOutputStreamClass *class;
503   gssize bytes_copied;
504
505   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
506   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
507
508   if (g_input_stream_is_closed (source))
509     {
510       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
511                            _("Source stream is already closed"));
512       return -1;
513     }
514
515   if (!g_output_stream_set_pending (stream, error))
516     return -1;
517
518   class = G_OUTPUT_STREAM_GET_CLASS (stream);
519
520   if (cancellable)
521     g_cancellable_push_current (cancellable);
522
523   bytes_copied = class->splice (stream, source, flags, cancellable, error);
524
525   if (cancellable)
526     g_cancellable_pop_current (cancellable);
527
528   g_output_stream_clear_pending (stream);
529
530   return bytes_copied;
531 }
532
533 static gssize
534 g_output_stream_real_splice (GOutputStream             *stream,
535                              GInputStream              *source,
536                              GOutputStreamSpliceFlags   flags,
537                              GCancellable              *cancellable,
538                              GError                   **error)
539 {
540   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
541   gssize n_read, n_written;
542   gsize bytes_copied;
543   char buffer[8192], *p;
544   gboolean res;
545
546   bytes_copied = 0;
547   if (class->write_fn == NULL)
548     {
549       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
550                            _("Output stream doesn't implement write"));
551       res = FALSE;
552       goto notsupported;
553     }
554
555   res = TRUE;
556   do
557     {
558       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
559       if (n_read == -1)
560         {
561           res = FALSE;
562           break;
563         }
564
565       if (n_read == 0)
566         break;
567
568       p = buffer;
569       while (n_read > 0)
570         {
571           n_written = class->write_fn (stream, p, n_read, cancellable, error);
572           if (n_written == -1)
573             {
574               res = FALSE;
575               break;
576             }
577
578           p += n_written;
579           n_read -= n_written;
580           bytes_copied += n_written;
581         }
582
583       if (bytes_copied > G_MAXSSIZE)
584         bytes_copied = G_MAXSSIZE;
585     }
586   while (res);
587
588  notsupported:
589   if (!res)
590     error = NULL; /* Ignore further errors */
591
592   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
593     {
594       /* Don't care about errors in source here */
595       g_input_stream_close (source, cancellable, NULL);
596     }
597
598   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
599     {
600       /* But write errors on close are bad! */
601       res = g_output_stream_internal_close (stream, cancellable, error);
602     }
603
604   if (res)
605     return bytes_copied;
606
607   return -1;
608 }
609
610 /* Must always be called inside
611  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
612 static gboolean
613 g_output_stream_internal_close (GOutputStream  *stream,
614                                 GCancellable   *cancellable,
615                                 GError        **error)
616 {
617   GOutputStreamClass *class;
618   gboolean res;
619
620   if (stream->priv->closed)
621     return TRUE;
622
623   class = G_OUTPUT_STREAM_GET_CLASS (stream);
624
625   stream->priv->closing = TRUE;
626
627   if (cancellable)
628     g_cancellable_push_current (cancellable);
629
630   if (class->flush)
631     res = class->flush (stream, cancellable, error);
632   else
633     res = TRUE;
634
635   if (!res)
636     {
637       /* flushing caused the error that we want to return,
638        * but we still want to close the underlying stream if possible
639        */
640       if (class->close_fn)
641         class->close_fn (stream, cancellable, NULL);
642     }
643   else
644     {
645       res = TRUE;
646       if (class->close_fn)
647         res = class->close_fn (stream, cancellable, error);
648     }
649
650   if (cancellable)
651     g_cancellable_pop_current (cancellable);
652
653   stream->priv->closing = FALSE;
654   stream->priv->closed = TRUE;
655
656   return res;
657 }
658
659 /**
660  * g_output_stream_close:
661  * @stream: A #GOutputStream.
662  * @cancellable: (allow-none): optional cancellable object
663  * @error: location to store the error occurring, or %NULL to ignore
664  *
665  * Closes the stream, releasing resources related to it.
666  *
667  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
668  * Closing a stream multiple times will not return an error.
669  *
670  * Closing a stream will automatically flush any outstanding buffers in the
671  * stream.
672  *
673  * Streams will be automatically closed when the last reference
674  * is dropped, but you might want to call this function to make sure 
675  * resources are released as early as possible.
676  *
677  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
678  * open after the stream is closed. See the documentation for the individual
679  * stream for details.
680  *
681  * On failure the first error that happened will be reported, but the close
682  * operation will finish as much as possible. A stream that failed to
683  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
684  * is important to check and report the error to the user, otherwise
685  * there might be a loss of data as all data might not be written.
686  * 
687  * If @cancellable is not %NULL, then the operation can be cancelled by
688  * triggering the cancellable object from another thread. If the operation
689  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
690  * Cancelling a close will still leave the stream closed, but there some streams
691  * can use a faster close that doesn't block to e.g. check errors. On
692  * cancellation (as with any error) there is no guarantee that all written
693  * data will reach the target. 
694  *
695  * Returns: %TRUE on success, %FALSE on failure
696  **/
697 gboolean
698 g_output_stream_close (GOutputStream  *stream,
699                        GCancellable   *cancellable,
700                        GError        **error)
701 {
702   gboolean res;
703
704   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
705
706   if (stream->priv->closed)
707     return TRUE;
708
709   if (!g_output_stream_set_pending (stream, error))
710     return FALSE;
711
712   res = g_output_stream_internal_close (stream, cancellable, error);
713
714   g_output_stream_clear_pending (stream);
715   
716   return res;
717 }
718
719 static void
720 async_ready_write_callback_wrapper (GObject      *source_object,
721                                     GAsyncResult *res,
722                                     gpointer      user_data)
723 {
724   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
725   GOutputStreamClass *class;
726   GTask *task = user_data;
727   gssize nwrote;
728   GError *error = NULL;
729
730   g_output_stream_clear_pending (stream);
731   
732   if (g_async_result_legacy_propagate_error (res, &error))
733     nwrote = -1;
734   else
735     {
736       class = G_OUTPUT_STREAM_GET_CLASS (stream);
737       nwrote = class->write_finish (stream, res, &error);
738     }
739
740   if (nwrote >= 0)
741     g_task_return_int (task, nwrote);
742   else
743     g_task_return_error (task, error);
744   g_object_unref (task);
745 }
746
747 /**
748  * g_output_stream_write_async:
749  * @stream: A #GOutputStream.
750  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
751  * @count: the number of bytes to write
752  * @io_priority: the io priority of the request.
753  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
754  * @callback: (scope async): callback to call when the request is satisfied
755  * @user_data: (closure): the data to pass to callback function
756  *
757  * Request an asynchronous write of @count bytes from @buffer into 
758  * the stream. When the operation is finished @callback will be called.
759  * You can then call g_output_stream_write_finish() to get the result of the 
760  * operation.
761  *
762  * During an async request no other sync and async calls are allowed, 
763  * and will result in %G_IO_ERROR_PENDING errors. 
764  *
765  * A value of @count larger than %G_MAXSSIZE will cause a 
766  * %G_IO_ERROR_INVALID_ARGUMENT error.
767  *
768  * On success, the number of bytes written will be passed to the
769  * @callback. It is not an error if this is not the same as the 
770  * requested size, as it can happen e.g. on a partial I/O error, 
771  * but generally we try to write as many bytes as requested. 
772  *
773  * You are guaranteed that this method will never fail with
774  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
775  * method will just wait until this changes.
776  *
777  * Any outstanding I/O request with higher priority (lower numerical 
778  * value) will be executed before an outstanding request with lower 
779  * priority. Default priority is %G_PRIORITY_DEFAULT.
780  *
781  * The asyncronous methods have a default fallback that uses threads 
782  * to implement asynchronicity, so they are optional for inheriting 
783  * classes. However, if you override one you must override all.
784  *
785  * For the synchronous, blocking version of this function, see 
786  * g_output_stream_write().
787  *
788  * Note that no copy of @buffer will be made, so it must stay valid
789  * until @callback is called. See g_output_stream_write_bytes_async()
790  * for a #GBytes version that will automatically hold a reference to
791  * the contents (without copying) for the duration of the call.
792  */
793 void
794 g_output_stream_write_async (GOutputStream       *stream,
795                              const void          *buffer,
796                              gsize                count,
797                              int                  io_priority,
798                              GCancellable        *cancellable,
799                              GAsyncReadyCallback  callback,
800                              gpointer             user_data)
801 {
802   GOutputStreamClass *class;
803   GError *error = NULL;
804   GTask *task;
805
806   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
807   g_return_if_fail (buffer != NULL);
808
809   task = g_task_new (stream, cancellable, callback, user_data);
810   g_task_set_source_tag (task, g_output_stream_write_async);
811   g_task_set_priority (task, io_priority);
812
813   if (count == 0)
814     {
815       g_task_return_int (task, 0);
816       g_object_unref (task);
817       return;
818     }
819
820   if (((gssize) count) < 0)
821     {
822       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
823                                _("Too large count value passed to %s"),
824                                G_STRFUNC);
825       g_object_unref (task);
826       return;
827     }
828
829   if (!g_output_stream_set_pending (stream, &error))
830     {
831       g_task_return_error (task, error);
832       g_object_unref (task);
833       return;
834     }
835   
836   class = G_OUTPUT_STREAM_GET_CLASS (stream);
837
838   class->write_async (stream, buffer, count, io_priority, cancellable,
839                       async_ready_write_callback_wrapper, task);
840 }
841
842 /**
843  * g_output_stream_write_finish:
844  * @stream: a #GOutputStream.
845  * @result: a #GAsyncResult.
846  * @error: a #GError location to store the error occurring, or %NULL to 
847  * ignore.
848  * 
849  * Finishes a stream write operation.
850  * 
851  * Returns: a #gssize containing the number of bytes written to the stream.
852  **/
853 gssize
854 g_output_stream_write_finish (GOutputStream  *stream,
855                               GAsyncResult   *result,
856                               GError        **error)
857 {
858   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
859   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
860   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
861
862   /* @result is always the GTask created by g_output_stream_write_async();
863    * we called class->write_finish() from async_ready_write_callback_wrapper.
864    */
865   return g_task_propagate_int (G_TASK (result), error);
866 }
867
868 static void
869 write_bytes_callback (GObject      *stream,
870                       GAsyncResult *result,
871                       gpointer      user_data)
872 {
873   GTask *task = user_data;
874   GError *error = NULL;
875   gssize nwrote;
876
877   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
878                                          result, &error);
879   if (nwrote == -1)
880     g_task_return_error (task, error);
881   else
882     g_task_return_int (task, nwrote);
883   g_object_unref (task);
884 }
885
886 /**
887  * g_output_stream_write_bytes_async:
888  * @stream: A #GOutputStream.
889  * @bytes: The bytes to write
890  * @io_priority: the io priority of the request.
891  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
892  * @callback: (scope async): callback to call when the request is satisfied
893  * @user_data: (closure): the data to pass to callback function
894  *
895  * This function is similar to g_output_stream_write_async(), but
896  * takes a #GBytes as input.  Due to the refcounted nature of #GBytes,
897  * this allows the stream to avoid taking a copy of the data.
898  *
899  * However, note that this function may still perform partial writes,
900  * just like g_output_stream_write_async(). If that occurs, to continue
901  * writing, you will need to create a new #GBytes containing just the
902  * remaining bytes, using g_bytes_new_from_bytes(). Passing the same
903  * #GBytes instance multiple times potentially can result in duplicated
904  * data in the output stream.
905  *
906  * For the synchronous, blocking version of this function, see
907  * g_output_stream_write_bytes().
908  **/
909 void
910 g_output_stream_write_bytes_async (GOutputStream       *stream,
911                                    GBytes              *bytes,
912                                    int                  io_priority,
913                                    GCancellable        *cancellable,
914                                    GAsyncReadyCallback  callback,
915                                    gpointer             user_data)
916 {
917   GTask *task;
918   gsize size;
919   gconstpointer data;
920
921   data = g_bytes_get_data (bytes, &size);
922
923   task = g_task_new (stream, cancellable, callback, user_data);
924   g_task_set_task_data (task, g_bytes_ref (bytes),
925                         (GDestroyNotify) g_bytes_unref);
926
927   g_output_stream_write_async (stream,
928                                data, size,
929                                io_priority,
930                                cancellable,
931                                write_bytes_callback,
932                                task);
933 }
934
935 /**
936  * g_output_stream_write_bytes_finish:
937  * @stream: a #GOutputStream.
938  * @result: a #GAsyncResult.
939  * @error: a #GError location to store the error occurring, or %NULL to
940  * ignore.
941  *
942  * Finishes a stream write-from-#GBytes operation.
943  *
944  * Returns: a #gssize containing the number of bytes written to the stream.
945  **/
946 gssize
947 g_output_stream_write_bytes_finish (GOutputStream  *stream,
948                                     GAsyncResult   *result,
949                                     GError        **error)
950 {
951   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
952   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
953
954   return g_task_propagate_int (G_TASK (result), error);
955 }
956
957 static void
958 async_ready_splice_callback_wrapper (GObject      *source_object,
959                                      GAsyncResult *res,
960                                      gpointer     _data)
961 {
962   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
963   GOutputStreamClass *class;
964   GTask *task = _data;
965   gssize nspliced;
966   GError *error = NULL;
967
968   g_output_stream_clear_pending (stream);
969   
970   if (g_async_result_legacy_propagate_error (res, &error))
971     nspliced = -1;
972   else
973     {
974       class = G_OUTPUT_STREAM_GET_CLASS (stream);
975       nspliced = class->splice_finish (stream, res, &error);
976     }
977
978   if (nspliced >= 0)
979     g_task_return_int (task, nspliced);
980   else
981     g_task_return_error (task, error);
982   g_object_unref (task);
983 }
984
985 /**
986  * g_output_stream_splice_async:
987  * @stream: a #GOutputStream.
988  * @source: a #GInputStream. 
989  * @flags: a set of #GOutputStreamSpliceFlags.
990  * @io_priority: the io priority of the request.
991  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
992  * @callback: (scope async): a #GAsyncReadyCallback. 
993  * @user_data: (closure): user data passed to @callback.
994  * 
995  * Splices a stream asynchronously.
996  * When the operation is finished @callback will be called.
997  * You can then call g_output_stream_splice_finish() to get the 
998  * result of the operation.
999  *
1000  * For the synchronous, blocking version of this function, see 
1001  * g_output_stream_splice().
1002  **/
1003 void
1004 g_output_stream_splice_async (GOutputStream            *stream,
1005                               GInputStream             *source,
1006                               GOutputStreamSpliceFlags  flags,
1007                               int                       io_priority,
1008                               GCancellable             *cancellable,
1009                               GAsyncReadyCallback       callback,
1010                               gpointer                  user_data)
1011 {
1012   GOutputStreamClass *class;
1013   GTask *task;
1014   GError *error = NULL;
1015
1016   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1017   g_return_if_fail (G_IS_INPUT_STREAM (source));
1018
1019   task = g_task_new (stream, cancellable, callback, user_data);
1020   g_task_set_source_tag (task, g_output_stream_splice_async);
1021   g_task_set_priority (task, io_priority);
1022   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
1023
1024   if (g_input_stream_is_closed (source))
1025     {
1026       g_task_return_new_error (task,
1027                                G_IO_ERROR, G_IO_ERROR_CLOSED,
1028                                _("Source stream is already closed"));
1029       g_object_unref (task);
1030       return;
1031     }
1032   
1033   if (!g_output_stream_set_pending (stream, &error))
1034     {
1035       g_task_return_error (task, error);
1036       g_object_unref (task);
1037       return;
1038     }
1039
1040   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1041
1042   class->splice_async (stream, source, flags, io_priority, cancellable,
1043                        async_ready_splice_callback_wrapper, task);
1044 }
1045
1046 /**
1047  * g_output_stream_splice_finish:
1048  * @stream: a #GOutputStream.
1049  * @result: a #GAsyncResult.
1050  * @error: a #GError location to store the error occurring, or %NULL to 
1051  * ignore.
1052  *
1053  * Finishes an asynchronous stream splice operation.
1054  * 
1055  * Returns: a #gssize of the number of bytes spliced. Note that if the
1056  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1057  *     will be returned, and there is no way to determine the actual
1058  *     number of bytes spliced.
1059  **/
1060 gssize
1061 g_output_stream_splice_finish (GOutputStream  *stream,
1062                                GAsyncResult   *result,
1063                                GError        **error)
1064 {
1065   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1066   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1067   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
1068
1069   /* @result is always the GTask created by g_output_stream_splice_async();
1070    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
1071    */
1072   return g_task_propagate_int (G_TASK (result), error);
1073 }
1074
1075 static void
1076 async_ready_flush_callback_wrapper (GObject      *source_object,
1077                                     GAsyncResult *res,
1078                                     gpointer      user_data)
1079 {
1080   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1081   GOutputStreamClass *class;
1082   GTask *task = user_data;
1083   gboolean flushed;
1084   GError *error = NULL;
1085
1086   g_output_stream_clear_pending (stream);
1087   
1088   if (g_async_result_legacy_propagate_error (res, &error))
1089     flushed = FALSE;
1090   else
1091     {
1092       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1093       flushed = class->flush_finish (stream, res, &error);
1094     }
1095
1096   if (flushed)
1097     g_task_return_boolean (task, TRUE);
1098   else
1099     g_task_return_error (task, error);
1100   g_object_unref (task);
1101 }
1102
1103 /**
1104  * g_output_stream_flush_async:
1105  * @stream: a #GOutputStream.
1106  * @io_priority: the io priority of the request.
1107  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1108  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1109  * @user_data: (closure): the data to pass to callback function
1110  * 
1111  * Forces an asynchronous write of all user-space buffered data for
1112  * the given @stream.
1113  * For behaviour details see g_output_stream_flush().
1114  *
1115  * When the operation is finished @callback will be 
1116  * called. You can then call g_output_stream_flush_finish() to get the 
1117  * result of the operation.
1118  **/
1119 void
1120 g_output_stream_flush_async (GOutputStream       *stream,
1121                              int                  io_priority,
1122                              GCancellable        *cancellable,
1123                              GAsyncReadyCallback  callback,
1124                              gpointer             user_data)
1125 {
1126   GOutputStreamClass *class;
1127   GTask *task;
1128   GError *error = NULL;
1129
1130   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1131
1132   task = g_task_new (stream, cancellable, callback, user_data);
1133   g_task_set_source_tag (task, g_output_stream_flush_async);
1134   g_task_set_priority (task, io_priority);
1135
1136   if (!g_output_stream_set_pending (stream, &error))
1137     {
1138       g_task_return_error (task, error);
1139       g_object_unref (task);
1140       return;
1141     }
1142
1143   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1144   
1145   if (class->flush_async == NULL)
1146     {
1147       g_task_return_boolean (task, TRUE);
1148       g_object_unref (task);
1149       return;
1150     }
1151       
1152   class->flush_async (stream, io_priority, cancellable,
1153                       async_ready_flush_callback_wrapper, task);
1154 }
1155
1156 /**
1157  * g_output_stream_flush_finish:
1158  * @stream: a #GOutputStream.
1159  * @result: a GAsyncResult.
1160  * @error: a #GError location to store the error occurring, or %NULL to 
1161  * ignore.
1162  * 
1163  * Finishes flushing an output stream.
1164  * 
1165  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1166  **/
1167 gboolean
1168 g_output_stream_flush_finish (GOutputStream  *stream,
1169                               GAsyncResult   *result,
1170                               GError        **error)
1171 {
1172   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1173   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1174   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1175
1176   /* @result is always the GTask created by g_output_stream_flush_async();
1177    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1178    */
1179   return g_task_propagate_boolean (G_TASK (result), error);
1180 }
1181
1182
1183 static void
1184 async_ready_close_callback_wrapper (GObject      *source_object,
1185                                     GAsyncResult *res,
1186                                     gpointer      user_data)
1187 {
1188   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1189   GOutputStreamClass *class;
1190   GTask *task = user_data;
1191   GError *error = g_task_get_task_data (task);
1192
1193   stream->priv->closing = FALSE;
1194   stream->priv->closed = TRUE;
1195
1196   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1197     {
1198       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1199
1200       class->close_finish (stream, res,
1201                            error ? NULL : &error);
1202     }
1203
1204   if (error != NULL)
1205     g_task_return_error (task, error);
1206   else
1207     g_task_return_boolean (task, TRUE);
1208   g_object_unref (task);
1209 }
1210
1211 static void
1212 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1213                                             GAsyncResult *res,
1214                                             gpointer      user_data)
1215 {
1216   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1217   GOutputStreamClass *class;
1218   GTask *task = user_data;
1219   GError *error = NULL;
1220
1221   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1222
1223   if (!g_async_result_legacy_propagate_error (res, &error))
1224     {
1225       class->flush_finish (stream, res, &error);
1226     }
1227
1228   /* propagate the possible error */
1229   if (error)
1230     g_task_set_task_data (task, error, NULL);
1231
1232   /* we still close, even if there was a flush error */
1233   class->close_async (stream,
1234                       g_task_get_priority (task),
1235                       g_task_get_cancellable (task),
1236                       async_ready_close_callback_wrapper, task);
1237 }
1238
1239 static void
1240 real_close_async_cb (GObject      *source_object,
1241                      GAsyncResult *res,
1242                      gpointer      user_data)
1243 {
1244   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1245   GTask *task = user_data;
1246   GError *error = NULL;
1247   gboolean ret;
1248
1249   g_output_stream_clear_pending (stream);
1250
1251   ret = g_output_stream_internal_close_finish (stream, res, &error);
1252
1253   if (error != NULL)
1254     g_task_return_error (task, error);
1255   else
1256     g_task_return_boolean (task, ret);
1257
1258   g_object_unref (task);
1259 }
1260
1261 /**
1262  * g_output_stream_close_async:
1263  * @stream: A #GOutputStream.
1264  * @io_priority: the io priority of the request.
1265  * @cancellable: (allow-none): optional cancellable object
1266  * @callback: (scope async): callback to call when the request is satisfied
1267  * @user_data: (closure): the data to pass to callback function
1268  *
1269  * Requests an asynchronous close of the stream, releasing resources 
1270  * related to it. When the operation is finished @callback will be 
1271  * called. You can then call g_output_stream_close_finish() to get 
1272  * the result of the operation.
1273  *
1274  * For behaviour details see g_output_stream_close().
1275  *
1276  * The asyncronous methods have a default fallback that uses threads 
1277  * to implement asynchronicity, so they are optional for inheriting 
1278  * classes. However, if you override one you must override all.
1279  **/
1280 void
1281 g_output_stream_close_async (GOutputStream       *stream,
1282                              int                  io_priority,
1283                              GCancellable        *cancellable,
1284                              GAsyncReadyCallback  callback,
1285                              gpointer             user_data)
1286 {
1287   GTask *task;
1288   GError *error = NULL;
1289
1290   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1291   
1292   task = g_task_new (stream, cancellable, callback, user_data);
1293   g_task_set_source_tag (task, g_output_stream_close_async);
1294   g_task_set_priority (task, io_priority);
1295
1296   if (!g_output_stream_set_pending (stream, &error))
1297     {
1298       g_task_return_error (task, error);
1299       g_object_unref (task);
1300       return;
1301     }
1302
1303   g_output_stream_internal_close_async (stream, io_priority, cancellable,
1304                                         real_close_async_cb, task);
1305 }
1306
1307 /* Must always be called inside
1308  * g_output_stream_set_pending()/g_output_stream_clear_pending().
1309  */
1310 void
1311 g_output_stream_internal_close_async (GOutputStream       *stream,
1312                                       int                  io_priority,
1313                                       GCancellable        *cancellable,
1314                                       GAsyncReadyCallback  callback,
1315                                       gpointer             user_data)
1316 {
1317   GOutputStreamClass *class;
1318   GTask *task;
1319
1320   task = g_task_new (stream, cancellable, callback, user_data);
1321   g_task_set_source_tag (task, g_output_stream_internal_close_async);
1322   g_task_set_priority (task, io_priority);
1323
1324   if (stream->priv->closed)
1325     {
1326       g_task_return_boolean (task, TRUE);
1327       g_object_unref (task);
1328       return;
1329     }
1330
1331   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1332   stream->priv->closing = TRUE;
1333
1334   /* Call close_async directly if there is no need to flush, or if the flush
1335      can be done sync (in the output stream async close thread) */
1336   if (class->flush_async == NULL ||
1337       (class->flush_async == g_output_stream_real_flush_async &&
1338        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1339     {
1340       class->close_async (stream, io_priority, cancellable,
1341                           async_ready_close_callback_wrapper, task);
1342     }
1343   else
1344     {
1345       /* First do an async flush, then do the async close in the callback
1346          wrapper (see async_ready_close_flushed_callback_wrapper) */
1347       class->flush_async (stream, io_priority, cancellable,
1348                           async_ready_close_flushed_callback_wrapper, task);
1349     }
1350 }
1351
1352 static gboolean
1353 g_output_stream_internal_close_finish (GOutputStream  *stream,
1354                                        GAsyncResult   *result,
1355                                        GError        **error)
1356 {
1357   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1358   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1359   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
1360
1361   return g_task_propagate_boolean (G_TASK (result), error);
1362 }
1363
1364 /**
1365  * g_output_stream_close_finish:
1366  * @stream: a #GOutputStream.
1367  * @result: a #GAsyncResult.
1368  * @error: a #GError location to store the error occurring, or %NULL to 
1369  * ignore.
1370  * 
1371  * Closes an output stream.
1372  * 
1373  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1374  **/
1375 gboolean
1376 g_output_stream_close_finish (GOutputStream  *stream,
1377                               GAsyncResult   *result,
1378                               GError        **error)
1379 {
1380   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1381   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1382   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1383
1384   /* @result is always the GTask created by g_output_stream_close_async();
1385    * we called class->close_finish() from async_ready_close_callback_wrapper.
1386    */
1387   return g_task_propagate_boolean (G_TASK (result), error);
1388 }
1389
1390 /**
1391  * g_output_stream_is_closed:
1392  * @stream: a #GOutputStream.
1393  * 
1394  * Checks if an output stream has already been closed.
1395  * 
1396  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1397  **/
1398 gboolean
1399 g_output_stream_is_closed (GOutputStream *stream)
1400 {
1401   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1402   
1403   return stream->priv->closed;
1404 }
1405
1406 /**
1407  * g_output_stream_is_closing:
1408  * @stream: a #GOutputStream.
1409  *
1410  * Checks if an output stream is being closed. This can be
1411  * used inside e.g. a flush implementation to see if the
1412  * flush (or other i/o operation) is called from within
1413  * the closing operation.
1414  *
1415  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1416  *
1417  * Since: 2.24
1418  **/
1419 gboolean
1420 g_output_stream_is_closing (GOutputStream *stream)
1421 {
1422   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1423
1424   return stream->priv->closing;
1425 }
1426
1427 /**
1428  * g_output_stream_has_pending:
1429  * @stream: a #GOutputStream.
1430  * 
1431  * Checks if an ouput stream has pending actions.
1432  * 
1433  * Returns: %TRUE if @stream has pending actions. 
1434  **/
1435 gboolean
1436 g_output_stream_has_pending (GOutputStream *stream)
1437 {
1438   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1439   
1440   return stream->priv->pending;
1441 }
1442
1443 /**
1444  * g_output_stream_set_pending:
1445  * @stream: a #GOutputStream.
1446  * @error: a #GError location to store the error occurring, or %NULL to 
1447  * ignore.
1448  * 
1449  * Sets @stream to have actions pending. If the pending flag is
1450  * already set or @stream is closed, it will return %FALSE and set
1451  * @error.
1452  *
1453  * Returns: %TRUE if pending was previously unset and is now set.
1454  **/
1455 gboolean
1456 g_output_stream_set_pending (GOutputStream *stream,
1457                              GError **error)
1458 {
1459   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1460   
1461   if (stream->priv->closed)
1462     {
1463       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1464                            _("Stream is already closed"));
1465       return FALSE;
1466     }
1467   
1468   if (stream->priv->pending)
1469     {
1470       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1471                            /* Translators: This is an error you get if there is
1472                             * already an operation running against this stream when
1473                             * you try to start one */
1474                            _("Stream has outstanding operation"));
1475       return FALSE;
1476     }
1477   
1478   stream->priv->pending = TRUE;
1479   return TRUE;
1480 }
1481
1482 /**
1483  * g_output_stream_clear_pending:
1484  * @stream: output stream
1485  * 
1486  * Clears the pending flag on @stream.
1487  **/
1488 void
1489 g_output_stream_clear_pending (GOutputStream *stream)
1490 {
1491   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1492   
1493   stream->priv->pending = FALSE;
1494 }
1495
1496 /**
1497  * g_output_stream_async_write_is_via_threads:
1498  * @stream: a #GOutputStream.
1499  *
1500  * Checks if an ouput stream's write_async function uses threads.
1501  *
1502  * Returns: %TRUE if @stream's write_async function uses threads.
1503  **/
1504 gboolean
1505 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
1506 {
1507   GOutputStreamClass *class;
1508
1509   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1510
1511   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1512
1513   return (class->write_async == g_output_stream_real_write_async &&
1514       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1515         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
1516 }
1517
1518
1519 /********************************************
1520  *   Default implementation of async ops    *
1521  ********************************************/
1522
1523 typedef struct {
1524   const void         *buffer;
1525   gsize               count_requested;
1526   gssize              count_written;
1527 } WriteData;
1528
1529 static void
1530 free_write_data (WriteData *op)
1531 {
1532   g_slice_free (WriteData, op);
1533 }
1534
1535 static void
1536 write_async_thread (GTask        *task,
1537                     gpointer      source_object,
1538                     gpointer      task_data,
1539                     GCancellable *cancellable)
1540 {
1541   GOutputStream *stream = source_object;
1542   WriteData *op = task_data;
1543   GOutputStreamClass *class;
1544   GError *error = NULL;
1545   gssize count_written;
1546
1547   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1548   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1549                                    cancellable, &error);
1550   if (count_written == -1)
1551     g_task_return_error (task, error);
1552   else
1553     g_task_return_int (task, count_written);
1554 }
1555
1556 static void write_async_pollable (GPollableOutputStream *stream,
1557                                   GTask                 *task);
1558
1559 static gboolean
1560 write_async_pollable_ready (GPollableOutputStream *stream,
1561                             gpointer               user_data)
1562 {
1563   GTask *task = user_data;
1564
1565   write_async_pollable (stream, task);
1566   return FALSE;
1567 }
1568
1569 static void
1570 write_async_pollable (GPollableOutputStream *stream,
1571                       GTask                 *task)
1572 {
1573   GError *error = NULL;
1574   WriteData *op = g_task_get_task_data (task);
1575   gssize count_written;
1576
1577   if (g_task_return_error_if_cancelled (task))
1578     return;
1579
1580   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1581     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1582
1583   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1584     {
1585       GSource *source;
1586
1587       g_error_free (error);
1588
1589       source = g_pollable_output_stream_create_source (stream,
1590                                                        g_task_get_cancellable (task));
1591       g_task_attach_source (task, source,
1592                             (GSourceFunc) write_async_pollable_ready);
1593       g_source_unref (source);
1594       return;
1595     }
1596
1597   if (count_written == -1)
1598     g_task_return_error (task, error);
1599   else
1600     g_task_return_int (task, count_written);
1601 }
1602
1603 static void
1604 g_output_stream_real_write_async (GOutputStream       *stream,
1605                                   const void          *buffer,
1606                                   gsize                count,
1607                                   int                  io_priority,
1608                                   GCancellable        *cancellable,
1609                                   GAsyncReadyCallback  callback,
1610                                   gpointer             user_data)
1611 {
1612   GTask *task;
1613   WriteData *op;
1614
1615   op = g_slice_new0 (WriteData);
1616   task = g_task_new (stream, cancellable, callback, user_data);
1617   g_task_set_check_cancellable (task, FALSE);
1618   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1619   op->buffer = buffer;
1620   op->count_requested = count;
1621
1622   if (!g_output_stream_async_write_is_via_threads (stream))
1623     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1624   else
1625     g_task_run_in_thread (task, write_async_thread);
1626   g_object_unref (task);
1627 }
1628
1629 static gssize
1630 g_output_stream_real_write_finish (GOutputStream  *stream,
1631                                    GAsyncResult   *result,
1632                                    GError        **error)
1633 {
1634   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1635
1636   return g_task_propagate_int (G_TASK (result), error);
1637 }
1638
1639 typedef struct {
1640   GInputStream *source;
1641   GOutputStreamSpliceFlags flags;
1642   gssize n_read;
1643   gssize n_written;
1644   gsize bytes_copied;
1645   GError *error;
1646   guint8 *buffer;
1647 } SpliceData;
1648
1649 static void
1650 free_splice_data (SpliceData *op)
1651 {
1652   g_clear_pointer (&op->buffer, g_free);
1653   g_object_unref (op->source);
1654   g_clear_error (&op->error);
1655   g_free (op);
1656 }
1657
1658 static void
1659 real_splice_async_complete_cb (GTask *task)
1660 {
1661   SpliceData *op = g_task_get_task_data (task);
1662
1663   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
1664       !g_input_stream_is_closed (op->source))
1665     return;
1666
1667   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
1668       !g_output_stream_is_closed (g_task_get_source_object (task)))
1669     return;
1670
1671   if (op->error != NULL)
1672     {
1673       g_task_return_error (task, op->error);
1674       op->error = NULL;
1675     }
1676   else
1677     {
1678       g_task_return_int (task, op->bytes_copied);
1679     }
1680
1681   g_object_unref (task);
1682 }
1683
1684 static void
1685 real_splice_async_close_input_cb (GObject      *source,
1686                                   GAsyncResult *res,
1687                                   gpointer      user_data)
1688 {
1689   GTask *task = user_data;
1690
1691   g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
1692
1693   real_splice_async_complete_cb (task);
1694 }
1695
1696 static void
1697 real_splice_async_close_output_cb (GObject      *source,
1698                                    GAsyncResult *res,
1699                                    gpointer      user_data)
1700 {
1701   GTask *task = G_TASK (user_data);
1702   SpliceData *op = g_task_get_task_data (task);
1703   GError **error = (op->error == NULL) ? &op->error : NULL;
1704
1705   g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
1706
1707   real_splice_async_complete_cb (task);
1708 }
1709
1710 static void
1711 real_splice_async_complete (GTask *task)
1712 {
1713   SpliceData *op = g_task_get_task_data (task);
1714   gboolean done = TRUE;
1715
1716   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
1717     {
1718       done = FALSE;
1719       g_input_stream_close_async (op->source, g_task_get_priority (task),
1720                                   g_task_get_cancellable (task),
1721                                   real_splice_async_close_input_cb, task);
1722     }
1723
1724   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
1725     {
1726       done = FALSE;
1727       g_output_stream_internal_close_async (g_task_get_source_object (task),
1728                                             g_task_get_priority (task),
1729                                             g_task_get_cancellable (task),
1730                                             real_splice_async_close_output_cb,
1731                                             task);
1732     }
1733
1734   if (done)
1735     real_splice_async_complete_cb (task);
1736 }
1737
1738 static void real_splice_async_read_cb (GObject      *source,
1739                                        GAsyncResult *res,
1740                                        gpointer      user_data);
1741
1742 static void
1743 real_splice_async_write_cb (GObject      *source,
1744                             GAsyncResult *res,
1745                             gpointer      user_data)
1746 {
1747   GOutputStreamClass *class;
1748   GTask *task = G_TASK (user_data);
1749   SpliceData *op = g_task_get_task_data (task);
1750   gssize ret;
1751
1752   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1753
1754   ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
1755
1756   if (ret == -1)
1757     {
1758       real_splice_async_complete (task);
1759       return;
1760     }
1761
1762   op->n_written += ret;
1763   op->bytes_copied += ret;
1764   if (op->bytes_copied > G_MAXSSIZE)
1765     op->bytes_copied = G_MAXSSIZE;
1766
1767   if (op->n_written < op->n_read)
1768     {
1769       class->write_async (g_task_get_source_object (task),
1770                           op->buffer + op->n_written,
1771                           op->n_read - op->n_written,
1772                           g_task_get_priority (task),
1773                           g_task_get_cancellable (task),
1774                           real_splice_async_write_cb, task);
1775       return;
1776     }
1777
1778   g_input_stream_read_async (op->source, op->buffer, 8192,
1779                              g_task_get_priority (task),
1780                              g_task_get_cancellable (task),
1781                              real_splice_async_read_cb, task);
1782 }
1783
1784 static void
1785 real_splice_async_read_cb (GObject      *source,
1786                            GAsyncResult *res,
1787                            gpointer      user_data)
1788 {
1789   GOutputStreamClass *class;
1790   GTask *task = G_TASK (user_data);
1791   SpliceData *op = g_task_get_task_data (task);
1792   gssize ret;
1793
1794   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1795
1796   ret = g_input_stream_read_finish (op->source, res, &op->error);
1797   if (ret == -1 || ret == 0)
1798     {
1799       real_splice_async_complete (task);
1800       return;
1801     }
1802
1803   op->n_read = ret;
1804   op->n_written = 0;
1805
1806   class->write_async (g_task_get_source_object (task), op->buffer,
1807                       op->n_read, g_task_get_priority (task),
1808                       g_task_get_cancellable (task),
1809                       real_splice_async_write_cb, task);
1810 }
1811
1812 static void
1813 splice_async_thread (GTask        *task,
1814                      gpointer      source_object,
1815                      gpointer      task_data,
1816                      GCancellable *cancellable)
1817 {
1818   GOutputStream *stream = source_object;
1819   SpliceData *op = task_data;
1820   GOutputStreamClass *class;
1821   GError *error = NULL;
1822   gssize bytes_copied;
1823
1824   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1825   
1826   bytes_copied = class->splice (stream,
1827                                 op->source,
1828                                 op->flags,
1829                                 cancellable,
1830                                 &error);
1831   if (bytes_copied == -1)
1832     g_task_return_error (task, error);
1833   else
1834     g_task_return_int (task, bytes_copied);
1835 }
1836
1837 static void
1838 g_output_stream_real_splice_async (GOutputStream             *stream,
1839                                    GInputStream              *source,
1840                                    GOutputStreamSpliceFlags   flags,
1841                                    int                        io_priority,
1842                                    GCancellable              *cancellable,
1843                                    GAsyncReadyCallback        callback,
1844                                    gpointer                   user_data)
1845 {
1846   GTask *task;
1847   SpliceData *op;
1848
1849   op = g_new0 (SpliceData, 1);
1850   task = g_task_new (stream, cancellable, callback, user_data);
1851   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1852   op->flags = flags;
1853   op->source = g_object_ref (source);
1854
1855   if (g_input_stream_async_read_is_via_threads (source) &&
1856       g_output_stream_async_write_is_via_threads (stream))
1857     {
1858       g_task_run_in_thread (task, splice_async_thread);
1859       g_object_unref (task);
1860     }
1861   else
1862     {
1863       op->buffer = g_malloc (8192);
1864       g_input_stream_read_async (op->source, op->buffer, 8192,
1865                                  g_task_get_priority (task),
1866                                  g_task_get_cancellable (task),
1867                                  real_splice_async_read_cb, task);
1868     }
1869 }
1870
1871 static gssize
1872 g_output_stream_real_splice_finish (GOutputStream  *stream,
1873                                     GAsyncResult   *result,
1874                                     GError        **error)
1875 {
1876   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1877
1878   return g_task_propagate_int (G_TASK (result), error);
1879 }
1880
1881
1882 static void
1883 flush_async_thread (GTask        *task,
1884                     gpointer      source_object,
1885                     gpointer      task_data,
1886                     GCancellable *cancellable)
1887 {
1888   GOutputStream *stream = source_object;
1889   GOutputStreamClass *class;
1890   gboolean result;
1891   GError *error = NULL;
1892
1893   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1894   result = TRUE;
1895   if (class->flush)
1896     result = class->flush (stream, cancellable, &error);
1897
1898   if (result)
1899     g_task_return_boolean (task, TRUE);
1900   else
1901     g_task_return_error (task, error);
1902 }
1903
1904 static void
1905 g_output_stream_real_flush_async (GOutputStream       *stream,
1906                                   int                  io_priority,
1907                                   GCancellable        *cancellable,
1908                                   GAsyncReadyCallback  callback,
1909                                   gpointer             user_data)
1910 {
1911   GTask *task;
1912
1913   task = g_task_new (stream, cancellable, callback, user_data);
1914   g_task_set_priority (task, io_priority);
1915   g_task_run_in_thread (task, flush_async_thread);
1916   g_object_unref (task);
1917 }
1918
1919 static gboolean
1920 g_output_stream_real_flush_finish (GOutputStream  *stream,
1921                                    GAsyncResult   *result,
1922                                    GError        **error)
1923 {
1924   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1925
1926   return g_task_propagate_boolean (G_TASK (result), error);
1927 }
1928
1929 static void
1930 close_async_thread (GTask        *task,
1931                     gpointer      source_object,
1932                     gpointer      task_data,
1933                     GCancellable *cancellable)
1934 {
1935   GOutputStream *stream = source_object;
1936   GOutputStreamClass *class;
1937   GError *error = NULL;
1938   gboolean result = TRUE;
1939
1940   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1941
1942   /* Do a flush here if there is a flush function, and we did not have to do
1943    * an async flush before (see g_output_stream_close_async)
1944    */
1945   if (class->flush != NULL &&
1946       (class->flush_async == NULL ||
1947        class->flush_async == g_output_stream_real_flush_async))
1948     {
1949       result = class->flush (stream, cancellable, &error);
1950     }
1951
1952   /* Auto handling of cancelation disabled, and ignore
1953      cancellation, since we want to close things anyway, although
1954      possibly in a quick-n-dirty way. At least we never want to leak
1955      open handles */
1956
1957   if (class->close_fn)
1958     {
1959       /* Make sure to close, even if the flush failed (see sync close) */
1960       if (!result)
1961         class->close_fn (stream, cancellable, NULL);
1962       else
1963         result = class->close_fn (stream, cancellable, &error);
1964     }
1965
1966   if (result)
1967     g_task_return_boolean (task, TRUE);
1968   else
1969     g_task_return_error (task, error);
1970 }
1971
1972 static void
1973 g_output_stream_real_close_async (GOutputStream       *stream,
1974                                   int                  io_priority,
1975                                   GCancellable        *cancellable,
1976                                   GAsyncReadyCallback  callback,
1977                                   gpointer             user_data)
1978 {
1979   GTask *task;
1980
1981   task = g_task_new (stream, cancellable, callback, user_data);
1982   g_task_set_priority (task, io_priority);
1983   g_task_run_in_thread (task, close_async_thread);
1984   g_object_unref (task);
1985 }
1986
1987 static gboolean
1988 g_output_stream_real_close_finish (GOutputStream  *stream,
1989                                    GAsyncResult   *result,
1990                                    GError        **error)
1991 {
1992   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1993
1994   return g_task_propagate_boolean (G_TASK (result), error);
1995 }