Updated FSF's address
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
17  *
18  * Author: Alexander Larsson <alexl@redhat.com>
19  */
20
21 #include "config.h"
22 #include <string.h>
23 #include "goutputstream.h"
24 #include "gcancellable.h"
25 #include "gasyncresult.h"
26 #include "gtask.h"
27 #include "ginputstream.h"
28 #include "gioerror.h"
29 #include "gioprivate.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GOutputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   guint closing : 1;
52   GAsyncReadyCallback outstanding_callback;
53 };
54
55 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
102                                                       int                      io_priority,
103                                                       GCancellable            *cancellable,
104                                                       GAsyncReadyCallback      callback,
105                                                       gpointer                 data);
106 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
107                                                        GAsyncResult           *result,
108                                                        GError                **error);
109
110 static void
111 g_output_stream_dispose (GObject *object)
112 {
113   GOutputStream *stream;
114
115   stream = G_OUTPUT_STREAM (object);
116   
117   if (!stream->priv->closed)
118     g_output_stream_close (stream, NULL, NULL);
119
120   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
121 }
122
123 static void
124 g_output_stream_class_init (GOutputStreamClass *klass)
125 {
126   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
127
128   gobject_class->dispose = g_output_stream_dispose;
129
130   klass->splice = g_output_stream_real_splice;
131   
132   klass->write_async = g_output_stream_real_write_async;
133   klass->write_finish = g_output_stream_real_write_finish;
134   klass->splice_async = g_output_stream_real_splice_async;
135   klass->splice_finish = g_output_stream_real_splice_finish;
136   klass->flush_async = g_output_stream_real_flush_async;
137   klass->flush_finish = g_output_stream_real_flush_finish;
138   klass->close_async = g_output_stream_real_close_async;
139   klass->close_finish = g_output_stream_real_close_finish;
140 }
141
142 static void
143 g_output_stream_init (GOutputStream *stream)
144 {
145   stream->priv = g_output_stream_get_instance_private (stream);
146 }
147
148 /**
149  * g_output_stream_write:
150  * @stream: a #GOutputStream.
151  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
152  * @count: the number of bytes to write
153  * @cancellable: (allow-none): optional cancellable object
154  * @error: location to store the error occurring, or %NULL to ignore
155  *
156  * Tries to write @count bytes from @buffer into the stream. Will block
157  * during the operation.
158  * 
159  * If count is 0, returns 0 and does nothing. A value of @count
160  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
161  *
162  * On success, the number of bytes written to the stream is returned.
163  * It is not an error if this is not the same as the requested size, as it
164  * can happen e.g. on a partial I/O error, or if there is not enough
165  * storage in the stream. All writes block until at least one byte
166  * is written or an error occurs; 0 is never returned (unless
167  * @count is 0).
168  * 
169  * If @cancellable is not %NULL, then the operation can be cancelled by
170  * triggering the cancellable object from another thread. If the operation
171  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
172  * operation was partially finished when the operation was cancelled the
173  * partial result will be returned, without an error.
174  *
175  * On error -1 is returned and @error is set accordingly.
176  * 
177  * Virtual: write_fn
178  *
179  * Return value: Number of bytes written, or -1 on error
180  **/
181 gssize
182 g_output_stream_write (GOutputStream  *stream,
183                        const void     *buffer,
184                        gsize           count,
185                        GCancellable   *cancellable,
186                        GError        **error)
187 {
188   GOutputStreamClass *class;
189   gssize res;
190
191   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
192   g_return_val_if_fail (buffer != NULL, 0);
193
194   if (count == 0)
195     return 0;
196   
197   if (((gssize) count) < 0)
198     {
199       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
200                    _("Too large count value passed to %s"), G_STRFUNC);
201       return -1;
202     }
203
204   class = G_OUTPUT_STREAM_GET_CLASS (stream);
205
206   if (class->write_fn == NULL) 
207     {
208       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
209                            _("Output stream doesn't implement write"));
210       return -1;
211     }
212   
213   if (!g_output_stream_set_pending (stream, error))
214     return -1;
215   
216   if (cancellable)
217     g_cancellable_push_current (cancellable);
218   
219   res = class->write_fn (stream, buffer, count, cancellable, error);
220   
221   if (cancellable)
222     g_cancellable_pop_current (cancellable);
223   
224   g_output_stream_clear_pending (stream);
225
226   return res; 
227 }
228
229 /**
230  * g_output_stream_write_all:
231  * @stream: a #GOutputStream.
232  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
233  * @count: the number of bytes to write
234  * @bytes_written: (out): location to store the number of bytes that was 
235  *     written to the stream
236  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
237  * @error: location to store the error occurring, or %NULL to ignore
238  *
239  * Tries to write @count bytes from @buffer into the stream. Will block
240  * during the operation.
241  * 
242  * This function is similar to g_output_stream_write(), except it tries to
243  * write as many bytes as requested, only stopping on an error.
244  *
245  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
246  * is set to @count.
247  * 
248  * If there is an error during the operation %FALSE is returned and @error
249  * is set to indicate the error status, @bytes_written is updated to contain
250  * the number of bytes written into the stream before the error occurred.
251  *
252  * Return value: %TRUE on success, %FALSE if there was an error
253  **/
254 gboolean
255 g_output_stream_write_all (GOutputStream  *stream,
256                            const void     *buffer,
257                            gsize           count,
258                            gsize          *bytes_written,
259                            GCancellable   *cancellable,
260                            GError        **error)
261 {
262   gsize _bytes_written;
263   gssize res;
264
265   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
266   g_return_val_if_fail (buffer != NULL, FALSE);
267
268   _bytes_written = 0;
269   while (_bytes_written < count)
270     {
271       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
272                                    cancellable, error);
273       if (res == -1)
274         {
275           if (bytes_written)
276             *bytes_written = _bytes_written;
277           return FALSE;
278         }
279       
280       if (res == 0)
281         g_warning ("Write returned zero without error");
282
283       _bytes_written += res;
284     }
285   
286   if (bytes_written)
287     *bytes_written = _bytes_written;
288
289   return TRUE;
290 }
291
292 /**
293  * g_output_stream_printf:
294  * @stream: a #GOutputStream.
295  * @bytes_written: (out): location to store the number of bytes that was
296  *     written to the stream
297  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
298  * @error: location to store the error occurring, or %NULL to ignore
299  * @format: the format string. See the printf() documentation
300  * @...: the parameters to insert into the format string
301  *
302  * This is a utility function around g_output_stream_write_all(). It
303  * uses g_strdup_vprintf() to turn @format and @... into a string that
304  * is then written to @stream.
305  *
306  * See the documentation of g_output_stream_write_all() about the
307  * behavior of the actual write operation.
308  *
309  * Note that partial writes cannot be properly checked with this
310  * function due to the variable length of the written string, if you
311  * need precise control over partial write failures, you need to
312  * create you own printf()-like wrapper around g_output_stream_write()
313  * or g_output_stream_write_all().
314  *
315  * Since: 2.40
316  *
317  * Return value: %TRUE on success, %FALSE if there was an error
318  **/
319 gboolean
320 g_output_stream_printf (GOutputStream  *stream,
321                         gsize          *bytes_written,
322                         GCancellable   *cancellable,
323                         GError        **error,
324                         const gchar    *format,
325                         ...)
326 {
327   va_list  args;
328   gboolean success;
329
330   va_start (args, format);
331   success = g_output_stream_vprintf (stream, bytes_written, cancellable,
332                                      error, format, args);
333   va_end (args);
334
335   return success;
336 }
337
338 /**
339  * g_output_stream_vprintf:
340  * @stream: a #GOutputStream.
341  * @bytes_written: (out): location to store the number of bytes that was
342  *     written to the stream
343  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
344  * @error: location to store the error occurring, or %NULL to ignore
345  * @format: the format string. See the printf() documentation
346  * @args: the parameters to insert into the format string
347  *
348  * This is a utility function around g_output_stream_write_all(). It
349  * uses g_strdup_vprintf() to turn @format and @args into a string that
350  * is then written to @stream.
351  *
352  * See the documentation of g_output_stream_write_all() about the
353  * behavior of the actual write operation.
354  *
355  * Note that partial writes cannot be properly checked with this
356  * function due to the variable length of the written string, if you
357  * need precise control over partial write failures, you need to
358  * create you own printf()-like wrapper around g_output_stream_write()
359  * or g_output_stream_write_all().
360  *
361  * Since: 2.40
362  *
363  * Return value: %TRUE on success, %FALSE if there was an error
364  **/
365 gboolean
366 g_output_stream_vprintf (GOutputStream  *stream,
367                          gsize          *bytes_written,
368                          GCancellable   *cancellable,
369                          GError        **error,
370                          const gchar    *format,
371                          va_list         args)
372 {
373   gchar    *text;
374   gboolean  success;
375
376   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
377   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (stream), FALSE);
378   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
379   g_return_val_if_fail (format != NULL, FALSE);
380
381   text = g_strdup_vprintf (format, args);
382   success = g_output_stream_write_all (stream,
383                                        text, strlen (text),
384                                        bytes_written, cancellable, error);
385   g_free (text);
386
387   return success;
388 }
389
390 /**
391  * g_output_stream_write_bytes:
392  * @stream: a #GOutputStream.
393  * @bytes: the #GBytes to write
394  * @cancellable: (allow-none): optional cancellable object
395  * @error: location to store the error occurring, or %NULL to ignore
396  *
397  * A wrapper function for g_output_stream_write() which takes a
398  * #GBytes as input.  This can be more convenient for use by language
399  * bindings or in other cases where the refcounted nature of #GBytes
400  * is helpful over a bare pointer interface.
401  *
402  * However, note that this function <emphasis>may</emphasis> still
403  * perform partial writes, just like g_output_stream_write().  If that
404  * occurs, to continue writing, you will need to create a new #GBytes
405  * containing just the remaining bytes, using
406  * g_bytes_new_from_bytes().  Passing the same #GBytes instance
407  * multiple times potentially can result in duplicated data in the
408  * output stream.
409  *
410  * Return value: Number of bytes written, or -1 on error
411  **/
412 gssize
413 g_output_stream_write_bytes (GOutputStream  *stream,
414                              GBytes         *bytes,
415                              GCancellable   *cancellable,
416                              GError        **error)
417 {
418   gsize size;
419   gconstpointer data;
420
421   data = g_bytes_get_data (bytes, &size);
422
423   return g_output_stream_write (stream,
424                                 data, size,
425                                 cancellable,
426                                 error);
427 }
428
429 /**
430  * g_output_stream_flush:
431  * @stream: a #GOutputStream.
432  * @cancellable: (allow-none): optional cancellable object
433  * @error: location to store the error occurring, or %NULL to ignore
434  *
435  * Forces a write of all user-space buffered data for the given
436  * @stream. Will block during the operation. Closing the stream will
437  * implicitly cause a flush.
438  *
439  * This function is optional for inherited classes.
440  * 
441  * If @cancellable is not %NULL, then the operation can be cancelled by
442  * triggering the cancellable object from another thread. If the operation
443  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
444  *
445  * Return value: %TRUE on success, %FALSE on error
446  **/
447 gboolean
448 g_output_stream_flush (GOutputStream  *stream,
449                        GCancellable   *cancellable,
450                        GError        **error)
451 {
452   GOutputStreamClass *class;
453   gboolean res;
454
455   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
456
457   if (!g_output_stream_set_pending (stream, error))
458     return FALSE;
459   
460   class = G_OUTPUT_STREAM_GET_CLASS (stream);
461
462   res = TRUE;
463   if (class->flush)
464     {
465       if (cancellable)
466         g_cancellable_push_current (cancellable);
467       
468       res = class->flush (stream, cancellable, error);
469       
470       if (cancellable)
471         g_cancellable_pop_current (cancellable);
472     }
473   
474   g_output_stream_clear_pending (stream);
475
476   return res;
477 }
478
479 /**
480  * g_output_stream_splice:
481  * @stream: a #GOutputStream.
482  * @source: a #GInputStream.
483  * @flags: a set of #GOutputStreamSpliceFlags.
484  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
485  * @error: a #GError location to store the error occurring, or %NULL to
486  * ignore.
487  *
488  * Splices an input stream into an output stream.
489  *
490  * Returns: a #gssize containing the size of the data spliced, or
491  *     -1 if an error occurred. Note that if the number of bytes
492  *     spliced is greater than %G_MAXSSIZE, then that will be
493  *     returned, and there is no way to determine the actual number
494  *     of bytes spliced.
495  **/
496 gssize
497 g_output_stream_splice (GOutputStream             *stream,
498                         GInputStream              *source,
499                         GOutputStreamSpliceFlags   flags,
500                         GCancellable              *cancellable,
501                         GError                   **error)
502 {
503   GOutputStreamClass *class;
504   gssize bytes_copied;
505
506   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
507   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
508
509   if (g_input_stream_is_closed (source))
510     {
511       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
512                            _("Source stream is already closed"));
513       return -1;
514     }
515
516   if (!g_output_stream_set_pending (stream, error))
517     return -1;
518
519   class = G_OUTPUT_STREAM_GET_CLASS (stream);
520
521   if (cancellable)
522     g_cancellable_push_current (cancellable);
523
524   bytes_copied = class->splice (stream, source, flags, cancellable, error);
525
526   if (cancellable)
527     g_cancellable_pop_current (cancellable);
528
529   g_output_stream_clear_pending (stream);
530
531   return bytes_copied;
532 }
533
534 static gssize
535 g_output_stream_real_splice (GOutputStream             *stream,
536                              GInputStream              *source,
537                              GOutputStreamSpliceFlags   flags,
538                              GCancellable              *cancellable,
539                              GError                   **error)
540 {
541   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
542   gssize n_read, n_written;
543   gsize bytes_copied;
544   char buffer[8192], *p;
545   gboolean res;
546
547   bytes_copied = 0;
548   if (class->write_fn == NULL)
549     {
550       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
551                            _("Output stream doesn't implement write"));
552       res = FALSE;
553       goto notsupported;
554     }
555
556   res = TRUE;
557   do
558     {
559       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
560       if (n_read == -1)
561         {
562           res = FALSE;
563           break;
564         }
565
566       if (n_read == 0)
567         break;
568
569       p = buffer;
570       while (n_read > 0)
571         {
572           n_written = class->write_fn (stream, p, n_read, cancellable, error);
573           if (n_written == -1)
574             {
575               res = FALSE;
576               break;
577             }
578
579           p += n_written;
580           n_read -= n_written;
581           bytes_copied += n_written;
582         }
583
584       if (bytes_copied > G_MAXSSIZE)
585         bytes_copied = G_MAXSSIZE;
586     }
587   while (res);
588
589  notsupported:
590   if (!res)
591     error = NULL; /* Ignore further errors */
592
593   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
594     {
595       /* Don't care about errors in source here */
596       g_input_stream_close (source, cancellable, NULL);
597     }
598
599   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
600     {
601       /* But write errors on close are bad! */
602       res = g_output_stream_internal_close (stream, cancellable, error);
603     }
604
605   if (res)
606     return bytes_copied;
607
608   return -1;
609 }
610
611 /* Must always be called inside
612  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
613 static gboolean
614 g_output_stream_internal_close (GOutputStream  *stream,
615                                 GCancellable   *cancellable,
616                                 GError        **error)
617 {
618   GOutputStreamClass *class;
619   gboolean res;
620
621   if (stream->priv->closed)
622     return TRUE;
623
624   class = G_OUTPUT_STREAM_GET_CLASS (stream);
625
626   stream->priv->closing = TRUE;
627
628   if (cancellable)
629     g_cancellable_push_current (cancellable);
630
631   if (class->flush)
632     res = class->flush (stream, cancellable, error);
633   else
634     res = TRUE;
635
636   if (!res)
637     {
638       /* flushing caused the error that we want to return,
639        * but we still want to close the underlying stream if possible
640        */
641       if (class->close_fn)
642         class->close_fn (stream, cancellable, NULL);
643     }
644   else
645     {
646       res = TRUE;
647       if (class->close_fn)
648         res = class->close_fn (stream, cancellable, error);
649     }
650
651   if (cancellable)
652     g_cancellable_pop_current (cancellable);
653
654   stream->priv->closing = FALSE;
655   stream->priv->closed = TRUE;
656
657   return res;
658 }
659
660 /**
661  * g_output_stream_close:
662  * @stream: A #GOutputStream.
663  * @cancellable: (allow-none): optional cancellable object
664  * @error: location to store the error occurring, or %NULL to ignore
665  *
666  * Closes the stream, releasing resources related to it.
667  *
668  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
669  * Closing a stream multiple times will not return an error.
670  *
671  * Closing a stream will automatically flush any outstanding buffers in the
672  * stream.
673  *
674  * Streams will be automatically closed when the last reference
675  * is dropped, but you might want to call this function to make sure 
676  * resources are released as early as possible.
677  *
678  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
679  * open after the stream is closed. See the documentation for the individual
680  * stream for details.
681  *
682  * On failure the first error that happened will be reported, but the close
683  * operation will finish as much as possible. A stream that failed to
684  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
685  * is important to check and report the error to the user, otherwise
686  * there might be a loss of data as all data might not be written.
687  * 
688  * If @cancellable is not %NULL, then the operation can be cancelled by
689  * triggering the cancellable object from another thread. If the operation
690  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
691  * Cancelling a close will still leave the stream closed, but there some streams
692  * can use a faster close that doesn't block to e.g. check errors. On
693  * cancellation (as with any error) there is no guarantee that all written
694  * data will reach the target. 
695  *
696  * Return value: %TRUE on success, %FALSE on failure
697  **/
698 gboolean
699 g_output_stream_close (GOutputStream  *stream,
700                        GCancellable   *cancellable,
701                        GError        **error)
702 {
703   gboolean res;
704
705   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
706
707   if (stream->priv->closed)
708     return TRUE;
709
710   if (!g_output_stream_set_pending (stream, error))
711     return FALSE;
712
713   res = g_output_stream_internal_close (stream, cancellable, error);
714
715   g_output_stream_clear_pending (stream);
716   
717   return res;
718 }
719
720 static void
721 async_ready_write_callback_wrapper (GObject      *source_object,
722                                     GAsyncResult *res,
723                                     gpointer      user_data)
724 {
725   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
726   GOutputStreamClass *class;
727   GTask *task = user_data;
728   gssize nwrote;
729   GError *error = NULL;
730
731   g_output_stream_clear_pending (stream);
732   
733   if (g_async_result_legacy_propagate_error (res, &error))
734     nwrote = -1;
735   else
736     {
737       class = G_OUTPUT_STREAM_GET_CLASS (stream);
738       nwrote = class->write_finish (stream, res, &error);
739     }
740
741   if (nwrote >= 0)
742     g_task_return_int (task, nwrote);
743   else
744     g_task_return_error (task, error);
745   g_object_unref (task);
746 }
747
748 /**
749  * g_output_stream_write_async:
750  * @stream: A #GOutputStream.
751  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
752  * @count: the number of bytes to write
753  * @io_priority: the io priority of the request.
754  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
755  * @callback: (scope async): callback to call when the request is satisfied
756  * @user_data: (closure): the data to pass to callback function
757  *
758  * Request an asynchronous write of @count bytes from @buffer into 
759  * the stream. When the operation is finished @callback will be called.
760  * You can then call g_output_stream_write_finish() to get the result of the 
761  * operation.
762  *
763  * During an async request no other sync and async calls are allowed, 
764  * and will result in %G_IO_ERROR_PENDING errors. 
765  *
766  * A value of @count larger than %G_MAXSSIZE will cause a 
767  * %G_IO_ERROR_INVALID_ARGUMENT error.
768  *
769  * On success, the number of bytes written will be passed to the
770  * @callback. It is not an error if this is not the same as the 
771  * requested size, as it can happen e.g. on a partial I/O error, 
772  * but generally we try to write as many bytes as requested. 
773  *
774  * You are guaranteed that this method will never fail with
775  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
776  * method will just wait until this changes.
777  *
778  * Any outstanding I/O request with higher priority (lower numerical 
779  * value) will be executed before an outstanding request with lower 
780  * priority. Default priority is %G_PRIORITY_DEFAULT.
781  *
782  * The asyncronous methods have a default fallback that uses threads 
783  * to implement asynchronicity, so they are optional for inheriting 
784  * classes. However, if you override one you must override all.
785  *
786  * For the synchronous, blocking version of this function, see 
787  * g_output_stream_write().
788  *
789  * <warning><para>No copy of @buffer will be made, so it must stay valid until
790  * @callback is called. See g_output_stream_write_bytes_async() for a #GBytes
791  * version that will automatically hold a reference to the contents (without
792  * copying) for the duration of the call.</para></warning>
793  **/
794 void
795 g_output_stream_write_async (GOutputStream       *stream,
796                              const void          *buffer,
797                              gsize                count,
798                              int                  io_priority,
799                              GCancellable        *cancellable,
800                              GAsyncReadyCallback  callback,
801                              gpointer             user_data)
802 {
803   GOutputStreamClass *class;
804   GError *error = NULL;
805   GTask *task;
806
807   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
808   g_return_if_fail (buffer != NULL);
809
810   task = g_task_new (stream, cancellable, callback, user_data);
811   g_task_set_source_tag (task, g_output_stream_write_async);
812   g_task_set_priority (task, io_priority);
813
814   if (count == 0)
815     {
816       g_task_return_int (task, 0);
817       g_object_unref (task);
818       return;
819     }
820
821   if (((gssize) count) < 0)
822     {
823       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
824                                _("Too large count value passed to %s"),
825                                G_STRFUNC);
826       g_object_unref (task);
827       return;
828     }
829
830   if (!g_output_stream_set_pending (stream, &error))
831     {
832       g_task_return_error (task, error);
833       g_object_unref (task);
834       return;
835     }
836   
837   class = G_OUTPUT_STREAM_GET_CLASS (stream);
838
839   class->write_async (stream, buffer, count, io_priority, cancellable,
840                       async_ready_write_callback_wrapper, task);
841 }
842
843 /**
844  * g_output_stream_write_finish:
845  * @stream: a #GOutputStream.
846  * @result: a #GAsyncResult.
847  * @error: a #GError location to store the error occurring, or %NULL to 
848  * ignore.
849  * 
850  * Finishes a stream write operation.
851  * 
852  * Returns: a #gssize containing the number of bytes written to the stream.
853  **/
854 gssize
855 g_output_stream_write_finish (GOutputStream  *stream,
856                               GAsyncResult   *result,
857                               GError        **error)
858 {
859   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
860   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
861   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
862
863   /* @result is always the GTask created by g_output_stream_write_async();
864    * we called class->write_finish() from async_ready_write_callback_wrapper.
865    */
866   return g_task_propagate_int (G_TASK (result), error);
867 }
868
869 static void
870 write_bytes_callback (GObject      *stream,
871                       GAsyncResult *result,
872                       gpointer      user_data)
873 {
874   GTask *task = user_data;
875   GError *error = NULL;
876   gssize nwrote;
877
878   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
879                                          result, &error);
880   if (nwrote == -1)
881     g_task_return_error (task, error);
882   else
883     g_task_return_int (task, nwrote);
884   g_object_unref (task);
885 }
886
887 /**
888  * g_output_stream_write_bytes_async:
889  * @stream: A #GOutputStream.
890  * @bytes: The bytes to write
891  * @io_priority: the io priority of the request.
892  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
893  * @callback: (scope async): callback to call when the request is satisfied
894  * @user_data: (closure): the data to pass to callback function
895  *
896  * This function is similar to g_output_stream_write_async(), but
897  * takes a #GBytes as input.  Due to the refcounted nature of #GBytes,
898  * this allows the stream to avoid taking a copy of the data.
899  *
900  * However, note that this function <emphasis>may</emphasis> still
901  * perform partial writes, just like g_output_stream_write_async().
902  * If that occurs, to continue writing, you will need to create a new
903  * #GBytes containing just the remaining bytes, using
904  * g_bytes_new_from_bytes().  Passing the same #GBytes instance
905  * multiple times potentially can result in duplicated data in the
906  * output stream.
907  *
908  * For the synchronous, blocking version of this function, see
909  * g_output_stream_write_bytes().
910  **/
911 void
912 g_output_stream_write_bytes_async (GOutputStream       *stream,
913                                    GBytes              *bytes,
914                                    int                  io_priority,
915                                    GCancellable        *cancellable,
916                                    GAsyncReadyCallback  callback,
917                                    gpointer             user_data)
918 {
919   GTask *task;
920   gsize size;
921   gconstpointer data;
922
923   data = g_bytes_get_data (bytes, &size);
924
925   task = g_task_new (stream, cancellable, callback, user_data);
926   g_task_set_task_data (task, g_bytes_ref (bytes),
927                         (GDestroyNotify) g_bytes_unref);
928
929   g_output_stream_write_async (stream,
930                                data, size,
931                                io_priority,
932                                cancellable,
933                                write_bytes_callback,
934                                task);
935 }
936
937 /**
938  * g_output_stream_write_bytes_finish:
939  * @stream: a #GOutputStream.
940  * @result: a #GAsyncResult.
941  * @error: a #GError location to store the error occurring, or %NULL to
942  * ignore.
943  *
944  * Finishes a stream write-from-#GBytes operation.
945  *
946  * Returns: a #gssize containing the number of bytes written to the stream.
947  **/
948 gssize
949 g_output_stream_write_bytes_finish (GOutputStream  *stream,
950                                     GAsyncResult   *result,
951                                     GError        **error)
952 {
953   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
954   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
955
956   return g_task_propagate_int (G_TASK (result), error);
957 }
958
959 static void
960 async_ready_splice_callback_wrapper (GObject      *source_object,
961                                      GAsyncResult *res,
962                                      gpointer     _data)
963 {
964   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
965   GOutputStreamClass *class;
966   GTask *task = _data;
967   gssize nspliced;
968   GError *error = NULL;
969
970   g_output_stream_clear_pending (stream);
971   
972   if (g_async_result_legacy_propagate_error (res, &error))
973     nspliced = -1;
974   else
975     {
976       class = G_OUTPUT_STREAM_GET_CLASS (stream);
977       nspliced = class->splice_finish (stream, res, &error);
978     }
979
980   if (nspliced >= 0)
981     g_task_return_int (task, nspliced);
982   else
983     g_task_return_error (task, error);
984   g_object_unref (task);
985 }
986
987 /**
988  * g_output_stream_splice_async:
989  * @stream: a #GOutputStream.
990  * @source: a #GInputStream. 
991  * @flags: a set of #GOutputStreamSpliceFlags.
992  * @io_priority: the io priority of the request.
993  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
994  * @callback: (scope async): a #GAsyncReadyCallback. 
995  * @user_data: (closure): user data passed to @callback.
996  * 
997  * Splices a stream asynchronously.
998  * When the operation is finished @callback will be called.
999  * You can then call g_output_stream_splice_finish() to get the 
1000  * result of the operation.
1001  *
1002  * For the synchronous, blocking version of this function, see 
1003  * g_output_stream_splice().
1004  **/
1005 void
1006 g_output_stream_splice_async (GOutputStream            *stream,
1007                               GInputStream             *source,
1008                               GOutputStreamSpliceFlags  flags,
1009                               int                       io_priority,
1010                               GCancellable             *cancellable,
1011                               GAsyncReadyCallback       callback,
1012                               gpointer                  user_data)
1013 {
1014   GOutputStreamClass *class;
1015   GTask *task;
1016   GError *error = NULL;
1017
1018   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1019   g_return_if_fail (G_IS_INPUT_STREAM (source));
1020
1021   task = g_task_new (stream, cancellable, callback, user_data);
1022   g_task_set_source_tag (task, g_output_stream_splice_async);
1023   g_task_set_priority (task, io_priority);
1024   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
1025
1026   if (g_input_stream_is_closed (source))
1027     {
1028       g_task_return_new_error (task,
1029                                G_IO_ERROR, G_IO_ERROR_CLOSED,
1030                                _("Source stream is already closed"));
1031       g_object_unref (task);
1032       return;
1033     }
1034   
1035   if (!g_output_stream_set_pending (stream, &error))
1036     {
1037       g_task_return_error (task, error);
1038       g_object_unref (task);
1039       return;
1040     }
1041
1042   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1043
1044   class->splice_async (stream, source, flags, io_priority, cancellable,
1045                        async_ready_splice_callback_wrapper, task);
1046 }
1047
1048 /**
1049  * g_output_stream_splice_finish:
1050  * @stream: a #GOutputStream.
1051  * @result: a #GAsyncResult.
1052  * @error: a #GError location to store the error occurring, or %NULL to 
1053  * ignore.
1054  *
1055  * Finishes an asynchronous stream splice operation.
1056  * 
1057  * Returns: a #gssize of the number of bytes spliced. Note that if the
1058  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1059  *     will be returned, and there is no way to determine the actual
1060  *     number of bytes spliced.
1061  **/
1062 gssize
1063 g_output_stream_splice_finish (GOutputStream  *stream,
1064                                GAsyncResult   *result,
1065                                GError        **error)
1066 {
1067   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1068   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1069   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
1070
1071   /* @result is always the GTask created by g_output_stream_splice_async();
1072    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
1073    */
1074   return g_task_propagate_int (G_TASK (result), error);
1075 }
1076
1077 static void
1078 async_ready_flush_callback_wrapper (GObject      *source_object,
1079                                     GAsyncResult *res,
1080                                     gpointer      user_data)
1081 {
1082   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1083   GOutputStreamClass *class;
1084   GTask *task = user_data;
1085   gboolean flushed;
1086   GError *error = NULL;
1087
1088   g_output_stream_clear_pending (stream);
1089   
1090   if (g_async_result_legacy_propagate_error (res, &error))
1091     flushed = FALSE;
1092   else
1093     {
1094       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1095       flushed = class->flush_finish (stream, res, &error);
1096     }
1097
1098   if (flushed)
1099     g_task_return_boolean (task, TRUE);
1100   else
1101     g_task_return_error (task, error);
1102   g_object_unref (task);
1103 }
1104
1105 /**
1106  * g_output_stream_flush_async:
1107  * @stream: a #GOutputStream.
1108  * @io_priority: the io priority of the request.
1109  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1110  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1111  * @user_data: (closure): the data to pass to callback function
1112  * 
1113  * Forces an asynchronous write of all user-space buffered data for
1114  * the given @stream.
1115  * For behaviour details see g_output_stream_flush().
1116  *
1117  * When the operation is finished @callback will be 
1118  * called. You can then call g_output_stream_flush_finish() to get the 
1119  * result of the operation.
1120  **/
1121 void
1122 g_output_stream_flush_async (GOutputStream       *stream,
1123                              int                  io_priority,
1124                              GCancellable        *cancellable,
1125                              GAsyncReadyCallback  callback,
1126                              gpointer             user_data)
1127 {
1128   GOutputStreamClass *class;
1129   GTask *task;
1130   GError *error = NULL;
1131
1132   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1133
1134   task = g_task_new (stream, cancellable, callback, user_data);
1135   g_task_set_source_tag (task, g_output_stream_flush_async);
1136   g_task_set_priority (task, io_priority);
1137
1138   if (!g_output_stream_set_pending (stream, &error))
1139     {
1140       g_task_return_error (task, error);
1141       g_object_unref (task);
1142       return;
1143     }
1144
1145   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1146   
1147   if (class->flush_async == NULL)
1148     {
1149       g_task_return_boolean (task, TRUE);
1150       g_object_unref (task);
1151       return;
1152     }
1153       
1154   class->flush_async (stream, io_priority, cancellable,
1155                       async_ready_flush_callback_wrapper, task);
1156 }
1157
1158 /**
1159  * g_output_stream_flush_finish:
1160  * @stream: a #GOutputStream.
1161  * @result: a GAsyncResult.
1162  * @error: a #GError location to store the error occurring, or %NULL to 
1163  * ignore.
1164  * 
1165  * Finishes flushing an output stream.
1166  * 
1167  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1168  **/
1169 gboolean
1170 g_output_stream_flush_finish (GOutputStream  *stream,
1171                               GAsyncResult   *result,
1172                               GError        **error)
1173 {
1174   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1175   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1176   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1177
1178   /* @result is always the GTask created by g_output_stream_flush_async();
1179    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1180    */
1181   return g_task_propagate_boolean (G_TASK (result), error);
1182 }
1183
1184
1185 static void
1186 async_ready_close_callback_wrapper (GObject      *source_object,
1187                                     GAsyncResult *res,
1188                                     gpointer      user_data)
1189 {
1190   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1191   GOutputStreamClass *class;
1192   GTask *task = user_data;
1193   GError *error = g_task_get_task_data (task);
1194
1195   stream->priv->closing = FALSE;
1196   stream->priv->closed = TRUE;
1197
1198   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1199     {
1200       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1201
1202       class->close_finish (stream, res,
1203                            error ? NULL : &error);
1204     }
1205
1206   if (error != NULL)
1207     g_task_return_error (task, error);
1208   else
1209     g_task_return_boolean (task, TRUE);
1210   g_object_unref (task);
1211 }
1212
1213 static void
1214 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1215                                             GAsyncResult *res,
1216                                             gpointer      user_data)
1217 {
1218   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1219   GOutputStreamClass *class;
1220   GTask *task = user_data;
1221   GError *error = NULL;
1222
1223   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1224
1225   if (!g_async_result_legacy_propagate_error (res, &error))
1226     {
1227       class->flush_finish (stream, res, &error);
1228     }
1229
1230   /* propagate the possible error */
1231   if (error)
1232     g_task_set_task_data (task, error, NULL);
1233
1234   /* we still close, even if there was a flush error */
1235   class->close_async (stream,
1236                       g_task_get_priority (task),
1237                       g_task_get_cancellable (task),
1238                       async_ready_close_callback_wrapper, task);
1239 }
1240
1241 static void
1242 real_close_async_cb (GObject      *source_object,
1243                      GAsyncResult *res,
1244                      gpointer      user_data)
1245 {
1246   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1247   GTask *task = user_data;
1248   GError *error = NULL;
1249   gboolean ret;
1250
1251   g_output_stream_clear_pending (stream);
1252
1253   ret = g_output_stream_internal_close_finish (stream, res, &error);
1254
1255   if (error != NULL)
1256     g_task_return_error (task, error);
1257   else
1258     g_task_return_boolean (task, ret);
1259
1260   g_object_unref (task);
1261 }
1262
1263 /**
1264  * g_output_stream_close_async:
1265  * @stream: A #GOutputStream.
1266  * @io_priority: the io priority of the request.
1267  * @cancellable: (allow-none): optional cancellable object
1268  * @callback: (scope async): callback to call when the request is satisfied
1269  * @user_data: (closure): the data to pass to callback function
1270  *
1271  * Requests an asynchronous close of the stream, releasing resources 
1272  * related to it. When the operation is finished @callback will be 
1273  * called. You can then call g_output_stream_close_finish() to get 
1274  * the result of the operation.
1275  *
1276  * For behaviour details see g_output_stream_close().
1277  *
1278  * The asyncronous methods have a default fallback that uses threads 
1279  * to implement asynchronicity, so they are optional for inheriting 
1280  * classes. However, if you override one you must override all.
1281  **/
1282 void
1283 g_output_stream_close_async (GOutputStream       *stream,
1284                              int                  io_priority,
1285                              GCancellable        *cancellable,
1286                              GAsyncReadyCallback  callback,
1287                              gpointer             user_data)
1288 {
1289   GTask *task;
1290   GError *error = NULL;
1291
1292   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1293   
1294   task = g_task_new (stream, cancellable, callback, user_data);
1295   g_task_set_source_tag (task, g_output_stream_close_async);
1296   g_task_set_priority (task, io_priority);
1297
1298   if (!g_output_stream_set_pending (stream, &error))
1299     {
1300       g_task_return_error (task, error);
1301       g_object_unref (task);
1302       return;
1303     }
1304
1305   g_output_stream_internal_close_async (stream, io_priority, cancellable,
1306                                         real_close_async_cb, task);
1307 }
1308
1309 /* Must always be called inside
1310  * g_output_stream_set_pending()/g_output_stream_clear_pending().
1311  */
1312 void
1313 g_output_stream_internal_close_async (GOutputStream       *stream,
1314                                       int                  io_priority,
1315                                       GCancellable        *cancellable,
1316                                       GAsyncReadyCallback  callback,
1317                                       gpointer             user_data)
1318 {
1319   GOutputStreamClass *class;
1320   GTask *task;
1321
1322   task = g_task_new (stream, cancellable, callback, user_data);
1323   g_task_set_source_tag (task, g_output_stream_internal_close_async);
1324   g_task_set_priority (task, io_priority);
1325
1326   if (stream->priv->closed)
1327     {
1328       g_task_return_boolean (task, TRUE);
1329       g_object_unref (task);
1330       return;
1331     }
1332
1333   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1334   stream->priv->closing = TRUE;
1335
1336   /* Call close_async directly if there is no need to flush, or if the flush
1337      can be done sync (in the output stream async close thread) */
1338   if (class->flush_async == NULL ||
1339       (class->flush_async == g_output_stream_real_flush_async &&
1340        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1341     {
1342       class->close_async (stream, io_priority, cancellable,
1343                           async_ready_close_callback_wrapper, task);
1344     }
1345   else
1346     {
1347       /* First do an async flush, then do the async close in the callback
1348          wrapper (see async_ready_close_flushed_callback_wrapper) */
1349       class->flush_async (stream, io_priority, cancellable,
1350                           async_ready_close_flushed_callback_wrapper, task);
1351     }
1352 }
1353
1354 static gboolean
1355 g_output_stream_internal_close_finish (GOutputStream  *stream,
1356                                        GAsyncResult   *result,
1357                                        GError        **error)
1358 {
1359   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1360   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1361   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
1362
1363   return g_task_propagate_boolean (G_TASK (result), error);
1364 }
1365
1366 /**
1367  * g_output_stream_close_finish:
1368  * @stream: a #GOutputStream.
1369  * @result: a #GAsyncResult.
1370  * @error: a #GError location to store the error occurring, or %NULL to 
1371  * ignore.
1372  * 
1373  * Closes an output stream.
1374  * 
1375  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1376  **/
1377 gboolean
1378 g_output_stream_close_finish (GOutputStream  *stream,
1379                               GAsyncResult   *result,
1380                               GError        **error)
1381 {
1382   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1383   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1384   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1385
1386   /* @result is always the GTask created by g_output_stream_close_async();
1387    * we called class->close_finish() from async_ready_close_callback_wrapper.
1388    */
1389   return g_task_propagate_boolean (G_TASK (result), error);
1390 }
1391
1392 /**
1393  * g_output_stream_is_closed:
1394  * @stream: a #GOutputStream.
1395  * 
1396  * Checks if an output stream has already been closed.
1397  * 
1398  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1399  **/
1400 gboolean
1401 g_output_stream_is_closed (GOutputStream *stream)
1402 {
1403   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1404   
1405   return stream->priv->closed;
1406 }
1407
1408 /**
1409  * g_output_stream_is_closing:
1410  * @stream: a #GOutputStream.
1411  *
1412  * Checks if an output stream is being closed. This can be
1413  * used inside e.g. a flush implementation to see if the
1414  * flush (or other i/o operation) is called from within
1415  * the closing operation.
1416  *
1417  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1418  *
1419  * Since: 2.24
1420  **/
1421 gboolean
1422 g_output_stream_is_closing (GOutputStream *stream)
1423 {
1424   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1425
1426   return stream->priv->closing;
1427 }
1428
1429 /**
1430  * g_output_stream_has_pending:
1431  * @stream: a #GOutputStream.
1432  * 
1433  * Checks if an ouput stream has pending actions.
1434  * 
1435  * Returns: %TRUE if @stream has pending actions. 
1436  **/
1437 gboolean
1438 g_output_stream_has_pending (GOutputStream *stream)
1439 {
1440   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1441   
1442   return stream->priv->pending;
1443 }
1444
1445 /**
1446  * g_output_stream_set_pending:
1447  * @stream: a #GOutputStream.
1448  * @error: a #GError location to store the error occurring, or %NULL to 
1449  * ignore.
1450  * 
1451  * Sets @stream to have actions pending. If the pending flag is
1452  * already set or @stream is closed, it will return %FALSE and set
1453  * @error.
1454  *
1455  * Return value: %TRUE if pending was previously unset and is now set.
1456  **/
1457 gboolean
1458 g_output_stream_set_pending (GOutputStream *stream,
1459                              GError **error)
1460 {
1461   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1462   
1463   if (stream->priv->closed)
1464     {
1465       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1466                            _("Stream is already closed"));
1467       return FALSE;
1468     }
1469   
1470   if (stream->priv->pending)
1471     {
1472       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1473                            /* Translators: This is an error you get if there is
1474                             * already an operation running against this stream when
1475                             * you try to start one */
1476                            _("Stream has outstanding operation"));
1477       return FALSE;
1478     }
1479   
1480   stream->priv->pending = TRUE;
1481   return TRUE;
1482 }
1483
1484 /**
1485  * g_output_stream_clear_pending:
1486  * @stream: output stream
1487  * 
1488  * Clears the pending flag on @stream.
1489  **/
1490 void
1491 g_output_stream_clear_pending (GOutputStream *stream)
1492 {
1493   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1494   
1495   stream->priv->pending = FALSE;
1496 }
1497
1498 /**
1499  * g_output_stream_async_write_is_via_threads:
1500  * @stream: a #GOutputStream.
1501  *
1502  * Checks if an ouput stream's write_async function uses threads.
1503  *
1504  * Returns: %TRUE if @stream's write_async function uses threads.
1505  **/
1506 gboolean
1507 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
1508 {
1509   GOutputStreamClass *class;
1510
1511   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1512
1513   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1514
1515   return (class->write_async == g_output_stream_real_write_async &&
1516       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1517         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
1518 }
1519
1520
1521 /********************************************
1522  *   Default implementation of async ops    *
1523  ********************************************/
1524
1525 typedef struct {
1526   const void         *buffer;
1527   gsize               count_requested;
1528   gssize              count_written;
1529 } WriteData;
1530
1531 static void
1532 free_write_data (WriteData *op)
1533 {
1534   g_slice_free (WriteData, op);
1535 }
1536
1537 static void
1538 write_async_thread (GTask        *task,
1539                     gpointer      source_object,
1540                     gpointer      task_data,
1541                     GCancellable *cancellable)
1542 {
1543   GOutputStream *stream = source_object;
1544   WriteData *op = task_data;
1545   GOutputStreamClass *class;
1546   GError *error = NULL;
1547   gssize count_written;
1548
1549   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1550   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1551                                    cancellable, &error);
1552   if (count_written == -1)
1553     g_task_return_error (task, error);
1554   else
1555     g_task_return_int (task, count_written);
1556 }
1557
1558 static void write_async_pollable (GPollableOutputStream *stream,
1559                                   GTask                 *task);
1560
1561 static gboolean
1562 write_async_pollable_ready (GPollableOutputStream *stream,
1563                             gpointer               user_data)
1564 {
1565   GTask *task = user_data;
1566
1567   write_async_pollable (stream, task);
1568   return FALSE;
1569 }
1570
1571 static void
1572 write_async_pollable (GPollableOutputStream *stream,
1573                       GTask                 *task)
1574 {
1575   GError *error = NULL;
1576   WriteData *op = g_task_get_task_data (task);
1577   gssize count_written;
1578
1579   if (g_task_return_error_if_cancelled (task))
1580     return;
1581
1582   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1583     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1584
1585   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1586     {
1587       GSource *source;
1588
1589       g_error_free (error);
1590
1591       source = g_pollable_output_stream_create_source (stream,
1592                                                        g_task_get_cancellable (task));
1593       g_task_attach_source (task, source,
1594                             (GSourceFunc) write_async_pollable_ready);
1595       g_source_unref (source);
1596       return;
1597     }
1598
1599   if (count_written == -1)
1600     g_task_return_error (task, error);
1601   else
1602     g_task_return_int (task, count_written);
1603 }
1604
1605 static void
1606 g_output_stream_real_write_async (GOutputStream       *stream,
1607                                   const void          *buffer,
1608                                   gsize                count,
1609                                   int                  io_priority,
1610                                   GCancellable        *cancellable,
1611                                   GAsyncReadyCallback  callback,
1612                                   gpointer             user_data)
1613 {
1614   GTask *task;
1615   WriteData *op;
1616
1617   op = g_slice_new0 (WriteData);
1618   task = g_task_new (stream, cancellable, callback, user_data);
1619   g_task_set_check_cancellable (task, FALSE);
1620   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1621   op->buffer = buffer;
1622   op->count_requested = count;
1623
1624   if (!g_output_stream_async_write_is_via_threads (stream))
1625     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1626   else
1627     g_task_run_in_thread (task, write_async_thread);
1628   g_object_unref (task);
1629 }
1630
1631 static gssize
1632 g_output_stream_real_write_finish (GOutputStream  *stream,
1633                                    GAsyncResult   *result,
1634                                    GError        **error)
1635 {
1636   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1637
1638   return g_task_propagate_int (G_TASK (result), error);
1639 }
1640
1641 typedef struct {
1642   GInputStream *source;
1643   GOutputStreamSpliceFlags flags;
1644   gssize n_read;
1645   gssize n_written;
1646   gsize bytes_copied;
1647   GError *error;
1648   guint8 *buffer;
1649 } SpliceData;
1650
1651 static void
1652 free_splice_data (SpliceData *op)
1653 {
1654   g_clear_pointer (&op->buffer, g_free);
1655   g_object_unref (op->source);
1656   g_clear_error (&op->error);
1657   g_free (op);
1658 }
1659
1660 static void
1661 real_splice_async_complete_cb (GTask *task)
1662 {
1663   SpliceData *op = g_task_get_task_data (task);
1664
1665   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
1666       !g_input_stream_is_closed (op->source))
1667     return;
1668
1669   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
1670       !g_output_stream_is_closed (g_task_get_source_object (task)))
1671     return;
1672
1673   if (op->error != NULL)
1674     {
1675       g_task_return_error (task, op->error);
1676       op->error = NULL;
1677     }
1678   else
1679     {
1680       g_task_return_int (task, op->bytes_copied);
1681     }
1682
1683   g_object_unref (task);
1684 }
1685
1686 static void
1687 real_splice_async_close_input_cb (GObject      *source,
1688                                   GAsyncResult *res,
1689                                   gpointer      user_data)
1690 {
1691   GTask *task = user_data;
1692
1693   g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
1694
1695   real_splice_async_complete_cb (task);
1696 }
1697
1698 static void
1699 real_splice_async_close_output_cb (GObject      *source,
1700                                    GAsyncResult *res,
1701                                    gpointer      user_data)
1702 {
1703   GTask *task = G_TASK (user_data);
1704   SpliceData *op = g_task_get_task_data (task);
1705   GError **error = (op->error == NULL) ? &op->error : NULL;
1706
1707   g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
1708
1709   real_splice_async_complete_cb (task);
1710 }
1711
1712 static void
1713 real_splice_async_complete (GTask *task)
1714 {
1715   SpliceData *op = g_task_get_task_data (task);
1716   gboolean done = TRUE;
1717
1718   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
1719     {
1720       done = FALSE;
1721       g_input_stream_close_async (op->source, g_task_get_priority (task),
1722                                   g_task_get_cancellable (task),
1723                                   real_splice_async_close_input_cb, task);
1724     }
1725
1726   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
1727     {
1728       done = FALSE;
1729       g_output_stream_internal_close_async (g_task_get_source_object (task),
1730                                             g_task_get_priority (task),
1731                                             g_task_get_cancellable (task),
1732                                             real_splice_async_close_output_cb,
1733                                             task);
1734     }
1735
1736   if (done)
1737     real_splice_async_complete_cb (task);
1738 }
1739
1740 static void real_splice_async_read_cb (GObject      *source,
1741                                        GAsyncResult *res,
1742                                        gpointer      user_data);
1743
1744 static void
1745 real_splice_async_write_cb (GObject      *source,
1746                             GAsyncResult *res,
1747                             gpointer      user_data)
1748 {
1749   GOutputStreamClass *class;
1750   GTask *task = G_TASK (user_data);
1751   SpliceData *op = g_task_get_task_data (task);
1752   gssize ret;
1753
1754   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1755
1756   ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
1757
1758   if (ret == -1)
1759     {
1760       real_splice_async_complete (task);
1761       return;
1762     }
1763
1764   op->n_written += ret;
1765   op->bytes_copied += ret;
1766   if (op->bytes_copied > G_MAXSSIZE)
1767     op->bytes_copied = G_MAXSSIZE;
1768
1769   if (op->n_written < op->n_read)
1770     {
1771       class->write_async (g_task_get_source_object (task),
1772                           op->buffer + op->n_written,
1773                           op->n_read - op->n_written,
1774                           g_task_get_priority (task),
1775                           g_task_get_cancellable (task),
1776                           real_splice_async_write_cb, task);
1777       return;
1778     }
1779
1780   g_input_stream_read_async (op->source, op->buffer, 8192,
1781                              g_task_get_priority (task),
1782                              g_task_get_cancellable (task),
1783                              real_splice_async_read_cb, task);
1784 }
1785
1786 static void
1787 real_splice_async_read_cb (GObject      *source,
1788                            GAsyncResult *res,
1789                            gpointer      user_data)
1790 {
1791   GOutputStreamClass *class;
1792   GTask *task = G_TASK (user_data);
1793   SpliceData *op = g_task_get_task_data (task);
1794   gssize ret;
1795
1796   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1797
1798   ret = g_input_stream_read_finish (op->source, res, &op->error);
1799   if (ret == -1 || ret == 0)
1800     {
1801       real_splice_async_complete (task);
1802       return;
1803     }
1804
1805   op->n_read = ret;
1806   op->n_written = 0;
1807
1808   class->write_async (g_task_get_source_object (task), op->buffer,
1809                       op->n_read, g_task_get_priority (task),
1810                       g_task_get_cancellable (task),
1811                       real_splice_async_write_cb, task);
1812 }
1813
1814 static void
1815 splice_async_thread (GTask        *task,
1816                      gpointer      source_object,
1817                      gpointer      task_data,
1818                      GCancellable *cancellable)
1819 {
1820   GOutputStream *stream = source_object;
1821   SpliceData *op = task_data;
1822   GOutputStreamClass *class;
1823   GError *error = NULL;
1824   gssize bytes_copied;
1825
1826   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1827   
1828   bytes_copied = class->splice (stream,
1829                                 op->source,
1830                                 op->flags,
1831                                 cancellable,
1832                                 &error);
1833   if (bytes_copied == -1)
1834     g_task_return_error (task, error);
1835   else
1836     g_task_return_int (task, bytes_copied);
1837 }
1838
1839 static void
1840 g_output_stream_real_splice_async (GOutputStream             *stream,
1841                                    GInputStream              *source,
1842                                    GOutputStreamSpliceFlags   flags,
1843                                    int                        io_priority,
1844                                    GCancellable              *cancellable,
1845                                    GAsyncReadyCallback        callback,
1846                                    gpointer                   user_data)
1847 {
1848   GTask *task;
1849   SpliceData *op;
1850
1851   op = g_new0 (SpliceData, 1);
1852   task = g_task_new (stream, cancellable, callback, user_data);
1853   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1854   op->flags = flags;
1855   op->source = g_object_ref (source);
1856
1857   if (g_input_stream_async_read_is_via_threads (source) &&
1858       g_output_stream_async_write_is_via_threads (stream))
1859     {
1860       g_task_run_in_thread (task, splice_async_thread);
1861       g_object_unref (task);
1862     }
1863   else
1864     {
1865       op->buffer = g_malloc (8192);
1866       g_input_stream_read_async (op->source, op->buffer, 8192,
1867                                  g_task_get_priority (task),
1868                                  g_task_get_cancellable (task),
1869                                  real_splice_async_read_cb, task);
1870     }
1871 }
1872
1873 static gssize
1874 g_output_stream_real_splice_finish (GOutputStream  *stream,
1875                                     GAsyncResult   *result,
1876                                     GError        **error)
1877 {
1878   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1879
1880   return g_task_propagate_int (G_TASK (result), error);
1881 }
1882
1883
1884 static void
1885 flush_async_thread (GTask        *task,
1886                     gpointer      source_object,
1887                     gpointer      task_data,
1888                     GCancellable *cancellable)
1889 {
1890   GOutputStream *stream = source_object;
1891   GOutputStreamClass *class;
1892   gboolean result;
1893   GError *error = NULL;
1894
1895   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1896   result = TRUE;
1897   if (class->flush)
1898     result = class->flush (stream, cancellable, &error);
1899
1900   if (result)
1901     g_task_return_boolean (task, TRUE);
1902   else
1903     g_task_return_error (task, error);
1904 }
1905
1906 static void
1907 g_output_stream_real_flush_async (GOutputStream       *stream,
1908                                   int                  io_priority,
1909                                   GCancellable        *cancellable,
1910                                   GAsyncReadyCallback  callback,
1911                                   gpointer             user_data)
1912 {
1913   GTask *task;
1914
1915   task = g_task_new (stream, cancellable, callback, user_data);
1916   g_task_set_priority (task, io_priority);
1917   g_task_run_in_thread (task, flush_async_thread);
1918   g_object_unref (task);
1919 }
1920
1921 static gboolean
1922 g_output_stream_real_flush_finish (GOutputStream  *stream,
1923                                    GAsyncResult   *result,
1924                                    GError        **error)
1925 {
1926   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1927
1928   return g_task_propagate_boolean (G_TASK (result), error);
1929 }
1930
1931 static void
1932 close_async_thread (GTask        *task,
1933                     gpointer      source_object,
1934                     gpointer      task_data,
1935                     GCancellable *cancellable)
1936 {
1937   GOutputStream *stream = source_object;
1938   GOutputStreamClass *class;
1939   GError *error = NULL;
1940   gboolean result = TRUE;
1941
1942   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1943
1944   /* Do a flush here if there is a flush function, and we did not have to do
1945    * an async flush before (see g_output_stream_close_async)
1946    */
1947   if (class->flush != NULL &&
1948       (class->flush_async == NULL ||
1949        class->flush_async == g_output_stream_real_flush_async))
1950     {
1951       result = class->flush (stream, cancellable, &error);
1952     }
1953
1954   /* Auto handling of cancelation disabled, and ignore
1955      cancellation, since we want to close things anyway, although
1956      possibly in a quick-n-dirty way. At least we never want to leak
1957      open handles */
1958
1959   if (class->close_fn)
1960     {
1961       /* Make sure to close, even if the flush failed (see sync close) */
1962       if (!result)
1963         class->close_fn (stream, cancellable, NULL);
1964       else
1965         result = class->close_fn (stream, cancellable, &error);
1966     }
1967
1968   if (result)
1969     g_task_return_boolean (task, TRUE);
1970   else
1971     g_task_return_error (task, error);
1972 }
1973
1974 static void
1975 g_output_stream_real_close_async (GOutputStream       *stream,
1976                                   int                  io_priority,
1977                                   GCancellable        *cancellable,
1978                                   GAsyncReadyCallback  callback,
1979                                   gpointer             user_data)
1980 {
1981   GTask *task;
1982
1983   task = g_task_new (stream, cancellable, callback, user_data);
1984   g_task_set_priority (task, io_priority);
1985   g_task_run_in_thread (task, close_async_thread);
1986   g_object_unref (task);
1987 }
1988
1989 static gboolean
1990 g_output_stream_real_close_finish (GOutputStream  *stream,
1991                                    GAsyncResult   *result,
1992                                    GError        **error)
1993 {
1994   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1995
1996   return g_task_propagate_boolean (G_TASK (result), error);
1997 }