Document clearly async functions not copying its args
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include <string.h>
25 #include "goutputstream.h"
26 #include "gcancellable.h"
27 #include "gasyncresult.h"
28 #include "gtask.h"
29 #include "ginputstream.h"
30 #include "gioerror.h"
31 #include "gioprivate.h"
32 #include "glibintl.h"
33 #include "gpollableoutputstream.h"
34
35 /**
36  * SECTION:goutputstream
37  * @short_description: Base class for implementing streaming output
38  * @include: gio/gio.h
39  *
40  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
41  * to close a stream (g_output_stream_close()) and to flush pending writes
42  * (g_output_stream_flush()). 
43  *
44  * To copy the content of an input stream to an output stream without 
45  * manually handling the reads and writes, use g_output_stream_splice(). 
46  *
47  * All of these functions have async variants too.
48  **/
49
50 struct _GOutputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   guint closing : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
58
59 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
60                                                     GInputStream              *source,
61                                                     GOutputStreamSpliceFlags   flags,
62                                                     GCancellable              *cancellable,
63                                                     GError                   **error);
64 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
65                                                     const void                *buffer,
66                                                     gsize                      count,
67                                                     int                        io_priority,
68                                                     GCancellable              *cancellable,
69                                                     GAsyncReadyCallback        callback,
70                                                     gpointer                   data);
71 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
72                                                     GAsyncResult              *result,
73                                                     GError                   **error);
74 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
75                                                     GInputStream              *source,
76                                                     GOutputStreamSpliceFlags   flags,
77                                                     int                        io_priority,
78                                                     GCancellable              *cancellable,
79                                                     GAsyncReadyCallback        callback,
80                                                     gpointer                   data);
81 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
82                                                     GAsyncResult              *result,
83                                                     GError                   **error);
84 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
85                                                     int                        io_priority,
86                                                     GCancellable              *cancellable,
87                                                     GAsyncReadyCallback        callback,
88                                                     gpointer                   data);
89 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
90                                                     GAsyncResult              *result,
91                                                     GError                   **error);
92 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
93                                                     int                        io_priority,
94                                                     GCancellable              *cancellable,
95                                                     GAsyncReadyCallback        callback,
96                                                     gpointer                   data);
97 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
98                                                     GAsyncResult              *result,
99                                                     GError                   **error);
100 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
101                                                     GCancellable              *cancellable,
102                                                     GError                   **error);
103 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
104                                                       int                      io_priority,
105                                                       GCancellable            *cancellable,
106                                                       GAsyncReadyCallback      callback,
107                                                       gpointer                 data);
108 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
109                                                        GAsyncResult           *result,
110                                                        GError                **error);
111
112 static void
113 g_output_stream_dispose (GObject *object)
114 {
115   GOutputStream *stream;
116
117   stream = G_OUTPUT_STREAM (object);
118   
119   if (!stream->priv->closed)
120     g_output_stream_close (stream, NULL, NULL);
121
122   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
123 }
124
125 static void
126 g_output_stream_class_init (GOutputStreamClass *klass)
127 {
128   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
129
130   gobject_class->dispose = g_output_stream_dispose;
131
132   klass->splice = g_output_stream_real_splice;
133   
134   klass->write_async = g_output_stream_real_write_async;
135   klass->write_finish = g_output_stream_real_write_finish;
136   klass->splice_async = g_output_stream_real_splice_async;
137   klass->splice_finish = g_output_stream_real_splice_finish;
138   klass->flush_async = g_output_stream_real_flush_async;
139   klass->flush_finish = g_output_stream_real_flush_finish;
140   klass->close_async = g_output_stream_real_close_async;
141   klass->close_finish = g_output_stream_real_close_finish;
142 }
143
144 static void
145 g_output_stream_init (GOutputStream *stream)
146 {
147   stream->priv = g_output_stream_get_instance_private (stream);
148 }
149
150 /**
151  * g_output_stream_write:
152  * @stream: a #GOutputStream.
153  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
154  * @count: the number of bytes to write
155  * @cancellable: (allow-none): optional cancellable object
156  * @error: location to store the error occurring, or %NULL to ignore
157  *
158  * Tries to write @count bytes from @buffer into the stream. Will block
159  * during the operation.
160  * 
161  * If count is 0, returns 0 and does nothing. A value of @count
162  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
163  *
164  * On success, the number of bytes written to the stream is returned.
165  * It is not an error if this is not the same as the requested size, as it
166  * can happen e.g. on a partial I/O error, or if there is not enough
167  * storage in the stream. All writes block until at least one byte
168  * is written or an error occurs; 0 is never returned (unless
169  * @count is 0).
170  * 
171  * If @cancellable is not %NULL, then the operation can be cancelled by
172  * triggering the cancellable object from another thread. If the operation
173  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
174  * operation was partially finished when the operation was cancelled the
175  * partial result will be returned, without an error.
176  *
177  * On error -1 is returned and @error is set accordingly.
178  * 
179  * Virtual: write_fn
180  *
181  * Return value: Number of bytes written, or -1 on error
182  **/
183 gssize
184 g_output_stream_write (GOutputStream  *stream,
185                        const void     *buffer,
186                        gsize           count,
187                        GCancellable   *cancellable,
188                        GError        **error)
189 {
190   GOutputStreamClass *class;
191   gssize res;
192
193   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
194   g_return_val_if_fail (buffer != NULL, 0);
195
196   if (count == 0)
197     return 0;
198   
199   if (((gssize) count) < 0)
200     {
201       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
202                    _("Too large count value passed to %s"), G_STRFUNC);
203       return -1;
204     }
205
206   class = G_OUTPUT_STREAM_GET_CLASS (stream);
207
208   if (class->write_fn == NULL) 
209     {
210       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
211                            _("Output stream doesn't implement write"));
212       return -1;
213     }
214   
215   if (!g_output_stream_set_pending (stream, error))
216     return -1;
217   
218   if (cancellable)
219     g_cancellable_push_current (cancellable);
220   
221   res = class->write_fn (stream, buffer, count, cancellable, error);
222   
223   if (cancellable)
224     g_cancellable_pop_current (cancellable);
225   
226   g_output_stream_clear_pending (stream);
227
228   return res; 
229 }
230
231 /**
232  * g_output_stream_write_all:
233  * @stream: a #GOutputStream.
234  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
235  * @count: the number of bytes to write
236  * @bytes_written: (out): location to store the number of bytes that was 
237  *     written to the stream
238  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
239  * @error: location to store the error occurring, or %NULL to ignore
240  *
241  * Tries to write @count bytes from @buffer into the stream. Will block
242  * during the operation.
243  * 
244  * This function is similar to g_output_stream_write(), except it tries to
245  * write as many bytes as requested, only stopping on an error.
246  *
247  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
248  * is set to @count.
249  * 
250  * If there is an error during the operation %FALSE is returned and @error
251  * is set to indicate the error status, @bytes_written is updated to contain
252  * the number of bytes written into the stream before the error occurred.
253  *
254  * Return value: %TRUE on success, %FALSE if there was an error
255  **/
256 gboolean
257 g_output_stream_write_all (GOutputStream  *stream,
258                            const void     *buffer,
259                            gsize           count,
260                            gsize          *bytes_written,
261                            GCancellable   *cancellable,
262                            GError        **error)
263 {
264   gsize _bytes_written;
265   gssize res;
266
267   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
268   g_return_val_if_fail (buffer != NULL, FALSE);
269
270   _bytes_written = 0;
271   while (_bytes_written < count)
272     {
273       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
274                                    cancellable, error);
275       if (res == -1)
276         {
277           if (bytes_written)
278             *bytes_written = _bytes_written;
279           return FALSE;
280         }
281       
282       if (res == 0)
283         g_warning ("Write returned zero without error");
284
285       _bytes_written += res;
286     }
287   
288   if (bytes_written)
289     *bytes_written = _bytes_written;
290
291   return TRUE;
292 }
293
294 /**
295  * g_output_stream_printf:
296  * @stream: a #GOutputStream.
297  * @bytes_written: (out): location to store the number of bytes that was
298  *     written to the stream
299  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
300  * @error: location to store the error occurring, or %NULL to ignore
301  * @format: the format string. See the printf() documentation
302  * @...: the parameters to insert into the format string
303  *
304  * This is a utility function around g_output_stream_write_all(). It
305  * uses g_strdup_vprintf() to turn @format and @... into a string that
306  * is then written to @stream.
307  *
308  * See the documentation of g_output_stream_write_all() about the
309  * behavior of the actual write operation.
310  *
311  * Note that partial writes cannot be properly checked with this
312  * function due to the variable length of the written string, if you
313  * need precise control over partial write failures, you need to
314  * create you own printf()-like wrapper around g_output_stream_write()
315  * or g_output_stream_write_all().
316  *
317  * Since: 2.40
318  *
319  * Return value: %TRUE on success, %FALSE if there was an error
320  **/
321 gboolean
322 g_output_stream_printf (GOutputStream  *stream,
323                         gsize          *bytes_written,
324                         GCancellable   *cancellable,
325                         GError        **error,
326                         const gchar    *format,
327                         ...)
328 {
329   va_list  args;
330   gboolean success;
331
332   va_start (args, format);
333   success = g_output_stream_vprintf (stream, bytes_written, cancellable,
334                                      error, format, args);
335   va_end (args);
336
337   return success;
338 }
339
340 /**
341  * g_output_stream_vprintf:
342  * @stream: a #GOutputStream.
343  * @bytes_written: (out): location to store the number of bytes that was
344  *     written to the stream
345  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
346  * @error: location to store the error occurring, or %NULL to ignore
347  * @format: the format string. See the printf() documentation
348  * @args: the parameters to insert into the format string
349  *
350  * This is a utility function around g_output_stream_write_all(). It
351  * uses g_strdup_vprintf() to turn @format and @args into a string that
352  * is then written to @stream.
353  *
354  * See the documentation of g_output_stream_write_all() about the
355  * behavior of the actual write operation.
356  *
357  * Note that partial writes cannot be properly checked with this
358  * function due to the variable length of the written string, if you
359  * need precise control over partial write failures, you need to
360  * create you own printf()-like wrapper around g_output_stream_write()
361  * or g_output_stream_write_all().
362  *
363  * Since: 2.40
364  *
365  * Return value: %TRUE on success, %FALSE if there was an error
366  **/
367 gboolean
368 g_output_stream_vprintf (GOutputStream  *stream,
369                          gsize          *bytes_written,
370                          GCancellable   *cancellable,
371                          GError        **error,
372                          const gchar    *format,
373                          va_list         args)
374 {
375   gchar    *text;
376   gboolean  success;
377
378   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
379   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (stream), FALSE);
380   g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
381   g_return_val_if_fail (format != NULL, FALSE);
382
383   text = g_strdup_vprintf (format, args);
384   success = g_output_stream_write_all (stream,
385                                        text, strlen (text),
386                                        bytes_written, cancellable, error);
387   g_free (text);
388
389   return success;
390 }
391
392 /**
393  * g_output_stream_write_bytes:
394  * @stream: a #GOutputStream.
395  * @bytes: the #GBytes to write
396  * @cancellable: (allow-none): optional cancellable object
397  * @error: location to store the error occurring, or %NULL to ignore
398  *
399  * A wrapper function for g_output_stream_write() which takes a
400  * #GBytes as input.  This can be more convenient for use by language
401  * bindings or in other cases where the refcounted nature of #GBytes
402  * is helpful over a bare pointer interface.
403  *
404  * However, note that this function <emphasis>may</emphasis> still
405  * perform partial writes, just like g_output_stream_write().  If that
406  * occurs, to continue writing, you will need to create a new #GBytes
407  * containing just the remaining bytes, using
408  * g_bytes_new_from_bytes().  Passing the same #GBytes instance
409  * multiple times potentially can result in duplicated data in the
410  * output stream.
411  *
412  * Return value: Number of bytes written, or -1 on error
413  **/
414 gssize
415 g_output_stream_write_bytes (GOutputStream  *stream,
416                              GBytes         *bytes,
417                              GCancellable   *cancellable,
418                              GError        **error)
419 {
420   gsize size;
421   gconstpointer data;
422
423   data = g_bytes_get_data (bytes, &size);
424
425   return g_output_stream_write (stream,
426                                 data, size,
427                                 cancellable,
428                                 error);
429 }
430
431 /**
432  * g_output_stream_flush:
433  * @stream: a #GOutputStream.
434  * @cancellable: (allow-none): optional cancellable object
435  * @error: location to store the error occurring, or %NULL to ignore
436  *
437  * Forces a write of all user-space buffered data for the given
438  * @stream. Will block during the operation. Closing the stream will
439  * implicitly cause a flush.
440  *
441  * This function is optional for inherited classes.
442  * 
443  * If @cancellable is not %NULL, then the operation can be cancelled by
444  * triggering the cancellable object from another thread. If the operation
445  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
446  *
447  * Return value: %TRUE on success, %FALSE on error
448  **/
449 gboolean
450 g_output_stream_flush (GOutputStream  *stream,
451                        GCancellable   *cancellable,
452                        GError        **error)
453 {
454   GOutputStreamClass *class;
455   gboolean res;
456
457   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
458
459   if (!g_output_stream_set_pending (stream, error))
460     return FALSE;
461   
462   class = G_OUTPUT_STREAM_GET_CLASS (stream);
463
464   res = TRUE;
465   if (class->flush)
466     {
467       if (cancellable)
468         g_cancellable_push_current (cancellable);
469       
470       res = class->flush (stream, cancellable, error);
471       
472       if (cancellable)
473         g_cancellable_pop_current (cancellable);
474     }
475   
476   g_output_stream_clear_pending (stream);
477
478   return res;
479 }
480
481 /**
482  * g_output_stream_splice:
483  * @stream: a #GOutputStream.
484  * @source: a #GInputStream.
485  * @flags: a set of #GOutputStreamSpliceFlags.
486  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
487  * @error: a #GError location to store the error occurring, or %NULL to
488  * ignore.
489  *
490  * Splices an input stream into an output stream.
491  *
492  * Returns: a #gssize containing the size of the data spliced, or
493  *     -1 if an error occurred. Note that if the number of bytes
494  *     spliced is greater than %G_MAXSSIZE, then that will be
495  *     returned, and there is no way to determine the actual number
496  *     of bytes spliced.
497  **/
498 gssize
499 g_output_stream_splice (GOutputStream             *stream,
500                         GInputStream              *source,
501                         GOutputStreamSpliceFlags   flags,
502                         GCancellable              *cancellable,
503                         GError                   **error)
504 {
505   GOutputStreamClass *class;
506   gssize bytes_copied;
507
508   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
509   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
510
511   if (g_input_stream_is_closed (source))
512     {
513       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
514                            _("Source stream is already closed"));
515       return -1;
516     }
517
518   if (!g_output_stream_set_pending (stream, error))
519     return -1;
520
521   class = G_OUTPUT_STREAM_GET_CLASS (stream);
522
523   if (cancellable)
524     g_cancellable_push_current (cancellable);
525
526   bytes_copied = class->splice (stream, source, flags, cancellable, error);
527
528   if (cancellable)
529     g_cancellable_pop_current (cancellable);
530
531   g_output_stream_clear_pending (stream);
532
533   return bytes_copied;
534 }
535
536 static gssize
537 g_output_stream_real_splice (GOutputStream             *stream,
538                              GInputStream              *source,
539                              GOutputStreamSpliceFlags   flags,
540                              GCancellable              *cancellable,
541                              GError                   **error)
542 {
543   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
544   gssize n_read, n_written;
545   gsize bytes_copied;
546   char buffer[8192], *p;
547   gboolean res;
548
549   bytes_copied = 0;
550   if (class->write_fn == NULL)
551     {
552       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
553                            _("Output stream doesn't implement write"));
554       res = FALSE;
555       goto notsupported;
556     }
557
558   res = TRUE;
559   do
560     {
561       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
562       if (n_read == -1)
563         {
564           res = FALSE;
565           break;
566         }
567
568       if (n_read == 0)
569         break;
570
571       p = buffer;
572       while (n_read > 0)
573         {
574           n_written = class->write_fn (stream, p, n_read, cancellable, error);
575           if (n_written == -1)
576             {
577               res = FALSE;
578               break;
579             }
580
581           p += n_written;
582           n_read -= n_written;
583           bytes_copied += n_written;
584         }
585
586       if (bytes_copied > G_MAXSSIZE)
587         bytes_copied = G_MAXSSIZE;
588     }
589   while (res);
590
591  notsupported:
592   if (!res)
593     error = NULL; /* Ignore further errors */
594
595   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
596     {
597       /* Don't care about errors in source here */
598       g_input_stream_close (source, cancellable, NULL);
599     }
600
601   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
602     {
603       /* But write errors on close are bad! */
604       res = g_output_stream_internal_close (stream, cancellable, error);
605     }
606
607   if (res)
608     return bytes_copied;
609
610   return -1;
611 }
612
613 /* Must always be called inside
614  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
615 static gboolean
616 g_output_stream_internal_close (GOutputStream  *stream,
617                                 GCancellable   *cancellable,
618                                 GError        **error)
619 {
620   GOutputStreamClass *class;
621   gboolean res;
622
623   if (stream->priv->closed)
624     return TRUE;
625
626   class = G_OUTPUT_STREAM_GET_CLASS (stream);
627
628   stream->priv->closing = TRUE;
629
630   if (cancellable)
631     g_cancellable_push_current (cancellable);
632
633   if (class->flush)
634     res = class->flush (stream, cancellable, error);
635   else
636     res = TRUE;
637
638   if (!res)
639     {
640       /* flushing caused the error that we want to return,
641        * but we still want to close the underlying stream if possible
642        */
643       if (class->close_fn)
644         class->close_fn (stream, cancellable, NULL);
645     }
646   else
647     {
648       res = TRUE;
649       if (class->close_fn)
650         res = class->close_fn (stream, cancellable, error);
651     }
652
653   if (cancellable)
654     g_cancellable_pop_current (cancellable);
655
656   stream->priv->closing = FALSE;
657   stream->priv->closed = TRUE;
658
659   return res;
660 }
661
662 /**
663  * g_output_stream_close:
664  * @stream: A #GOutputStream.
665  * @cancellable: (allow-none): optional cancellable object
666  * @error: location to store the error occurring, or %NULL to ignore
667  *
668  * Closes the stream, releasing resources related to it.
669  *
670  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
671  * Closing a stream multiple times will not return an error.
672  *
673  * Closing a stream will automatically flush any outstanding buffers in the
674  * stream.
675  *
676  * Streams will be automatically closed when the last reference
677  * is dropped, but you might want to call this function to make sure 
678  * resources are released as early as possible.
679  *
680  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
681  * open after the stream is closed. See the documentation for the individual
682  * stream for details.
683  *
684  * On failure the first error that happened will be reported, but the close
685  * operation will finish as much as possible. A stream that failed to
686  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
687  * is important to check and report the error to the user, otherwise
688  * there might be a loss of data as all data might not be written.
689  * 
690  * If @cancellable is not %NULL, then the operation can be cancelled by
691  * triggering the cancellable object from another thread. If the operation
692  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
693  * Cancelling a close will still leave the stream closed, but there some streams
694  * can use a faster close that doesn't block to e.g. check errors. On
695  * cancellation (as with any error) there is no guarantee that all written
696  * data will reach the target. 
697  *
698  * Return value: %TRUE on success, %FALSE on failure
699  **/
700 gboolean
701 g_output_stream_close (GOutputStream  *stream,
702                        GCancellable   *cancellable,
703                        GError        **error)
704 {
705   gboolean res;
706
707   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
708
709   if (stream->priv->closed)
710     return TRUE;
711
712   if (!g_output_stream_set_pending (stream, error))
713     return FALSE;
714
715   res = g_output_stream_internal_close (stream, cancellable, error);
716
717   g_output_stream_clear_pending (stream);
718   
719   return res;
720 }
721
722 static void
723 async_ready_write_callback_wrapper (GObject      *source_object,
724                                     GAsyncResult *res,
725                                     gpointer      user_data)
726 {
727   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
728   GOutputStreamClass *class;
729   GTask *task = user_data;
730   gssize nwrote;
731   GError *error = NULL;
732
733   g_output_stream_clear_pending (stream);
734   
735   if (g_async_result_legacy_propagate_error (res, &error))
736     nwrote = -1;
737   else
738     {
739       class = G_OUTPUT_STREAM_GET_CLASS (stream);
740       nwrote = class->write_finish (stream, res, &error);
741     }
742
743   if (nwrote >= 0)
744     g_task_return_int (task, nwrote);
745   else
746     g_task_return_error (task, error);
747   g_object_unref (task);
748 }
749
750 /**
751  * g_output_stream_write_async:
752  * @stream: A #GOutputStream.
753  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
754  * @count: the number of bytes to write
755  * @io_priority: the io priority of the request.
756  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
757  * @callback: (scope async): callback to call when the request is satisfied
758  * @user_data: (closure): the data to pass to callback function
759  *
760  * Request an asynchronous write of @count bytes from @buffer into 
761  * the stream. When the operation is finished @callback will be called.
762  * You can then call g_output_stream_write_finish() to get the result of the 
763  * operation.
764  *
765  * During an async request no other sync and async calls are allowed, 
766  * and will result in %G_IO_ERROR_PENDING errors. 
767  *
768  * A value of @count larger than %G_MAXSSIZE will cause a 
769  * %G_IO_ERROR_INVALID_ARGUMENT error.
770  *
771  * On success, the number of bytes written will be passed to the
772  * @callback. It is not an error if this is not the same as the 
773  * requested size, as it can happen e.g. on a partial I/O error, 
774  * but generally we try to write as many bytes as requested. 
775  *
776  * You are guaranteed that this method will never fail with
777  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
778  * method will just wait until this changes.
779  *
780  * Any outstanding I/O request with higher priority (lower numerical 
781  * value) will be executed before an outstanding request with lower 
782  * priority. Default priority is %G_PRIORITY_DEFAULT.
783  *
784  * The asyncronous methods have a default fallback that uses threads 
785  * to implement asynchronicity, so they are optional for inheriting 
786  * classes. However, if you override one you must override all.
787  *
788  * For the synchronous, blocking version of this function, see 
789  * g_output_stream_write().
790  *
791  * <warning><para>No copy of @buffer will be made, so it must stay valid until
792  * @callback is called. See g_output_stream_write_bytes_async() for a #GBytes
793  * version that will automatically hold a reference to the contents (without
794  * copying) for the duration of the call.</para></warning>
795  **/
796 void
797 g_output_stream_write_async (GOutputStream       *stream,
798                              const void          *buffer,
799                              gsize                count,
800                              int                  io_priority,
801                              GCancellable        *cancellable,
802                              GAsyncReadyCallback  callback,
803                              gpointer             user_data)
804 {
805   GOutputStreamClass *class;
806   GError *error = NULL;
807   GTask *task;
808
809   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
810   g_return_if_fail (buffer != NULL);
811
812   task = g_task_new (stream, cancellable, callback, user_data);
813   g_task_set_source_tag (task, g_output_stream_write_async);
814   g_task_set_priority (task, io_priority);
815
816   if (count == 0)
817     {
818       g_task_return_int (task, 0);
819       g_object_unref (task);
820       return;
821     }
822
823   if (((gssize) count) < 0)
824     {
825       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
826                                _("Too large count value passed to %s"),
827                                G_STRFUNC);
828       g_object_unref (task);
829       return;
830     }
831
832   if (!g_output_stream_set_pending (stream, &error))
833     {
834       g_task_return_error (task, error);
835       g_object_unref (task);
836       return;
837     }
838   
839   class = G_OUTPUT_STREAM_GET_CLASS (stream);
840
841   class->write_async (stream, buffer, count, io_priority, cancellable,
842                       async_ready_write_callback_wrapper, task);
843 }
844
845 /**
846  * g_output_stream_write_finish:
847  * @stream: a #GOutputStream.
848  * @result: a #GAsyncResult.
849  * @error: a #GError location to store the error occurring, or %NULL to 
850  * ignore.
851  * 
852  * Finishes a stream write operation.
853  * 
854  * Returns: a #gssize containing the number of bytes written to the stream.
855  **/
856 gssize
857 g_output_stream_write_finish (GOutputStream  *stream,
858                               GAsyncResult   *result,
859                               GError        **error)
860 {
861   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
862   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
863   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
864
865   /* @result is always the GTask created by g_output_stream_write_async();
866    * we called class->write_finish() from async_ready_write_callback_wrapper.
867    */
868   return g_task_propagate_int (G_TASK (result), error);
869 }
870
871 static void
872 write_bytes_callback (GObject      *stream,
873                       GAsyncResult *result,
874                       gpointer      user_data)
875 {
876   GTask *task = user_data;
877   GError *error = NULL;
878   gssize nwrote;
879
880   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
881                                          result, &error);
882   if (nwrote == -1)
883     g_task_return_error (task, error);
884   else
885     g_task_return_int (task, nwrote);
886   g_object_unref (task);
887 }
888
889 /**
890  * g_output_stream_write_bytes_async:
891  * @stream: A #GOutputStream.
892  * @bytes: The bytes to write
893  * @io_priority: the io priority of the request.
894  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
895  * @callback: (scope async): callback to call when the request is satisfied
896  * @user_data: (closure): the data to pass to callback function
897  *
898  * This function is similar to g_output_stream_write_async(), but
899  * takes a #GBytes as input.  Due to the refcounted nature of #GBytes,
900  * this allows the stream to avoid taking a copy of the data.
901  *
902  * However, note that this function <emphasis>may</emphasis> still
903  * perform partial writes, just like g_output_stream_write_async().
904  * If that occurs, to continue writing, you will need to create a new
905  * #GBytes containing just the remaining bytes, using
906  * g_bytes_new_from_bytes().  Passing the same #GBytes instance
907  * multiple times potentially can result in duplicated data in the
908  * output stream.
909  *
910  * For the synchronous, blocking version of this function, see
911  * g_output_stream_write_bytes().
912  **/
913 void
914 g_output_stream_write_bytes_async (GOutputStream       *stream,
915                                    GBytes              *bytes,
916                                    int                  io_priority,
917                                    GCancellable        *cancellable,
918                                    GAsyncReadyCallback  callback,
919                                    gpointer             user_data)
920 {
921   GTask *task;
922   gsize size;
923   gconstpointer data;
924
925   data = g_bytes_get_data (bytes, &size);
926
927   task = g_task_new (stream, cancellable, callback, user_data);
928   g_task_set_task_data (task, g_bytes_ref (bytes),
929                         (GDestroyNotify) g_bytes_unref);
930
931   g_output_stream_write_async (stream,
932                                data, size,
933                                io_priority,
934                                cancellable,
935                                write_bytes_callback,
936                                task);
937 }
938
939 /**
940  * g_output_stream_write_bytes_finish:
941  * @stream: a #GOutputStream.
942  * @result: a #GAsyncResult.
943  * @error: a #GError location to store the error occurring, or %NULL to
944  * ignore.
945  *
946  * Finishes a stream write-from-#GBytes operation.
947  *
948  * Returns: a #gssize containing the number of bytes written to the stream.
949  **/
950 gssize
951 g_output_stream_write_bytes_finish (GOutputStream  *stream,
952                                     GAsyncResult   *result,
953                                     GError        **error)
954 {
955   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
956   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
957
958   return g_task_propagate_int (G_TASK (result), error);
959 }
960
961 static void
962 async_ready_splice_callback_wrapper (GObject      *source_object,
963                                      GAsyncResult *res,
964                                      gpointer     _data)
965 {
966   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
967   GOutputStreamClass *class;
968   GTask *task = _data;
969   gssize nspliced;
970   GError *error = NULL;
971
972   g_output_stream_clear_pending (stream);
973   
974   if (g_async_result_legacy_propagate_error (res, &error))
975     nspliced = -1;
976   else
977     {
978       class = G_OUTPUT_STREAM_GET_CLASS (stream);
979       nspliced = class->splice_finish (stream, res, &error);
980     }
981
982   if (nspliced >= 0)
983     g_task_return_int (task, nspliced);
984   else
985     g_task_return_error (task, error);
986   g_object_unref (task);
987 }
988
989 /**
990  * g_output_stream_splice_async:
991  * @stream: a #GOutputStream.
992  * @source: a #GInputStream. 
993  * @flags: a set of #GOutputStreamSpliceFlags.
994  * @io_priority: the io priority of the request.
995  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
996  * @callback: (scope async): a #GAsyncReadyCallback. 
997  * @user_data: (closure): user data passed to @callback.
998  * 
999  * Splices a stream asynchronously.
1000  * When the operation is finished @callback will be called.
1001  * You can then call g_output_stream_splice_finish() to get the 
1002  * result of the operation.
1003  *
1004  * For the synchronous, blocking version of this function, see 
1005  * g_output_stream_splice().
1006  **/
1007 void
1008 g_output_stream_splice_async (GOutputStream            *stream,
1009                               GInputStream             *source,
1010                               GOutputStreamSpliceFlags  flags,
1011                               int                       io_priority,
1012                               GCancellable             *cancellable,
1013                               GAsyncReadyCallback       callback,
1014                               gpointer                  user_data)
1015 {
1016   GOutputStreamClass *class;
1017   GTask *task;
1018   GError *error = NULL;
1019
1020   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1021   g_return_if_fail (G_IS_INPUT_STREAM (source));
1022
1023   task = g_task_new (stream, cancellable, callback, user_data);
1024   g_task_set_source_tag (task, g_output_stream_splice_async);
1025   g_task_set_priority (task, io_priority);
1026   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
1027
1028   if (g_input_stream_is_closed (source))
1029     {
1030       g_task_return_new_error (task,
1031                                G_IO_ERROR, G_IO_ERROR_CLOSED,
1032                                _("Source stream is already closed"));
1033       g_object_unref (task);
1034       return;
1035     }
1036   
1037   if (!g_output_stream_set_pending (stream, &error))
1038     {
1039       g_task_return_error (task, error);
1040       g_object_unref (task);
1041       return;
1042     }
1043
1044   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1045
1046   class->splice_async (stream, source, flags, io_priority, cancellable,
1047                        async_ready_splice_callback_wrapper, task);
1048 }
1049
1050 /**
1051  * g_output_stream_splice_finish:
1052  * @stream: a #GOutputStream.
1053  * @result: a #GAsyncResult.
1054  * @error: a #GError location to store the error occurring, or %NULL to 
1055  * ignore.
1056  *
1057  * Finishes an asynchronous stream splice operation.
1058  * 
1059  * Returns: a #gssize of the number of bytes spliced. Note that if the
1060  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
1061  *     will be returned, and there is no way to determine the actual
1062  *     number of bytes spliced.
1063  **/
1064 gssize
1065 g_output_stream_splice_finish (GOutputStream  *stream,
1066                                GAsyncResult   *result,
1067                                GError        **error)
1068 {
1069   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1070   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1071   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
1072
1073   /* @result is always the GTask created by g_output_stream_splice_async();
1074    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
1075    */
1076   return g_task_propagate_int (G_TASK (result), error);
1077 }
1078
1079 static void
1080 async_ready_flush_callback_wrapper (GObject      *source_object,
1081                                     GAsyncResult *res,
1082                                     gpointer      user_data)
1083 {
1084   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1085   GOutputStreamClass *class;
1086   GTask *task = user_data;
1087   gboolean flushed;
1088   GError *error = NULL;
1089
1090   g_output_stream_clear_pending (stream);
1091   
1092   if (g_async_result_legacy_propagate_error (res, &error))
1093     flushed = FALSE;
1094   else
1095     {
1096       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1097       flushed = class->flush_finish (stream, res, &error);
1098     }
1099
1100   if (flushed)
1101     g_task_return_boolean (task, TRUE);
1102   else
1103     g_task_return_error (task, error);
1104   g_object_unref (task);
1105 }
1106
1107 /**
1108  * g_output_stream_flush_async:
1109  * @stream: a #GOutputStream.
1110  * @io_priority: the io priority of the request.
1111  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1112  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1113  * @user_data: (closure): the data to pass to callback function
1114  * 
1115  * Forces an asynchronous write of all user-space buffered data for
1116  * the given @stream.
1117  * For behaviour details see g_output_stream_flush().
1118  *
1119  * When the operation is finished @callback will be 
1120  * called. You can then call g_output_stream_flush_finish() to get the 
1121  * result of the operation.
1122  **/
1123 void
1124 g_output_stream_flush_async (GOutputStream       *stream,
1125                              int                  io_priority,
1126                              GCancellable        *cancellable,
1127                              GAsyncReadyCallback  callback,
1128                              gpointer             user_data)
1129 {
1130   GOutputStreamClass *class;
1131   GTask *task;
1132   GError *error = NULL;
1133
1134   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1135
1136   task = g_task_new (stream, cancellable, callback, user_data);
1137   g_task_set_source_tag (task, g_output_stream_flush_async);
1138   g_task_set_priority (task, io_priority);
1139
1140   if (!g_output_stream_set_pending (stream, &error))
1141     {
1142       g_task_return_error (task, error);
1143       g_object_unref (task);
1144       return;
1145     }
1146
1147   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1148   
1149   if (class->flush_async == NULL)
1150     {
1151       g_task_return_boolean (task, TRUE);
1152       g_object_unref (task);
1153       return;
1154     }
1155       
1156   class->flush_async (stream, io_priority, cancellable,
1157                       async_ready_flush_callback_wrapper, task);
1158 }
1159
1160 /**
1161  * g_output_stream_flush_finish:
1162  * @stream: a #GOutputStream.
1163  * @result: a GAsyncResult.
1164  * @error: a #GError location to store the error occurring, or %NULL to 
1165  * ignore.
1166  * 
1167  * Finishes flushing an output stream.
1168  * 
1169  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1170  **/
1171 gboolean
1172 g_output_stream_flush_finish (GOutputStream  *stream,
1173                               GAsyncResult   *result,
1174                               GError        **error)
1175 {
1176   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1177   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1178   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1179
1180   /* @result is always the GTask created by g_output_stream_flush_async();
1181    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1182    */
1183   return g_task_propagate_boolean (G_TASK (result), error);
1184 }
1185
1186
1187 static void
1188 async_ready_close_callback_wrapper (GObject      *source_object,
1189                                     GAsyncResult *res,
1190                                     gpointer      user_data)
1191 {
1192   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1193   GOutputStreamClass *class;
1194   GTask *task = user_data;
1195   GError *error = g_task_get_task_data (task);
1196
1197   stream->priv->closing = FALSE;
1198   stream->priv->closed = TRUE;
1199
1200   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1201     {
1202       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1203
1204       class->close_finish (stream, res,
1205                            error ? NULL : &error);
1206     }
1207
1208   if (error != NULL)
1209     g_task_return_error (task, error);
1210   else
1211     g_task_return_boolean (task, TRUE);
1212   g_object_unref (task);
1213 }
1214
1215 static void
1216 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1217                                             GAsyncResult *res,
1218                                             gpointer      user_data)
1219 {
1220   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1221   GOutputStreamClass *class;
1222   GTask *task = user_data;
1223   GError *error = NULL;
1224
1225   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1226
1227   if (!g_async_result_legacy_propagate_error (res, &error))
1228     {
1229       class->flush_finish (stream, res, &error);
1230     }
1231
1232   /* propagate the possible error */
1233   if (error)
1234     g_task_set_task_data (task, error, NULL);
1235
1236   /* we still close, even if there was a flush error */
1237   class->close_async (stream,
1238                       g_task_get_priority (task),
1239                       g_task_get_cancellable (task),
1240                       async_ready_close_callback_wrapper, task);
1241 }
1242
1243 static void
1244 real_close_async_cb (GObject      *source_object,
1245                      GAsyncResult *res,
1246                      gpointer      user_data)
1247 {
1248   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1249   GTask *task = user_data;
1250   GError *error = NULL;
1251   gboolean ret;
1252
1253   g_output_stream_clear_pending (stream);
1254
1255   ret = g_output_stream_internal_close_finish (stream, res, &error);
1256
1257   if (error != NULL)
1258     g_task_return_error (task, error);
1259   else
1260     g_task_return_boolean (task, ret);
1261
1262   g_object_unref (task);
1263 }
1264
1265 /**
1266  * g_output_stream_close_async:
1267  * @stream: A #GOutputStream.
1268  * @io_priority: the io priority of the request.
1269  * @cancellable: (allow-none): optional cancellable object
1270  * @callback: (scope async): callback to call when the request is satisfied
1271  * @user_data: (closure): the data to pass to callback function
1272  *
1273  * Requests an asynchronous close of the stream, releasing resources 
1274  * related to it. When the operation is finished @callback will be 
1275  * called. You can then call g_output_stream_close_finish() to get 
1276  * the result of the operation.
1277  *
1278  * For behaviour details see g_output_stream_close().
1279  *
1280  * The asyncronous methods have a default fallback that uses threads 
1281  * to implement asynchronicity, so they are optional for inheriting 
1282  * classes. However, if you override one you must override all.
1283  **/
1284 void
1285 g_output_stream_close_async (GOutputStream       *stream,
1286                              int                  io_priority,
1287                              GCancellable        *cancellable,
1288                              GAsyncReadyCallback  callback,
1289                              gpointer             user_data)
1290 {
1291   GTask *task;
1292   GError *error = NULL;
1293
1294   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1295   
1296   task = g_task_new (stream, cancellable, callback, user_data);
1297   g_task_set_source_tag (task, g_output_stream_close_async);
1298   g_task_set_priority (task, io_priority);
1299
1300   if (!g_output_stream_set_pending (stream, &error))
1301     {
1302       g_task_return_error (task, error);
1303       g_object_unref (task);
1304       return;
1305     }
1306
1307   g_output_stream_internal_close_async (stream, io_priority, cancellable,
1308                                         real_close_async_cb, task);
1309 }
1310
1311 /* Must always be called inside
1312  * g_output_stream_set_pending()/g_output_stream_clear_pending().
1313  */
1314 void
1315 g_output_stream_internal_close_async (GOutputStream       *stream,
1316                                       int                  io_priority,
1317                                       GCancellable        *cancellable,
1318                                       GAsyncReadyCallback  callback,
1319                                       gpointer             user_data)
1320 {
1321   GOutputStreamClass *class;
1322   GTask *task;
1323
1324   task = g_task_new (stream, cancellable, callback, user_data);
1325   g_task_set_source_tag (task, g_output_stream_internal_close_async);
1326   g_task_set_priority (task, io_priority);
1327
1328   if (stream->priv->closed)
1329     {
1330       g_task_return_boolean (task, TRUE);
1331       g_object_unref (task);
1332       return;
1333     }
1334
1335   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1336   stream->priv->closing = TRUE;
1337
1338   /* Call close_async directly if there is no need to flush, or if the flush
1339      can be done sync (in the output stream async close thread) */
1340   if (class->flush_async == NULL ||
1341       (class->flush_async == g_output_stream_real_flush_async &&
1342        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1343     {
1344       class->close_async (stream, io_priority, cancellable,
1345                           async_ready_close_callback_wrapper, task);
1346     }
1347   else
1348     {
1349       /* First do an async flush, then do the async close in the callback
1350          wrapper (see async_ready_close_flushed_callback_wrapper) */
1351       class->flush_async (stream, io_priority, cancellable,
1352                           async_ready_close_flushed_callback_wrapper, task);
1353     }
1354 }
1355
1356 static gboolean
1357 g_output_stream_internal_close_finish (GOutputStream  *stream,
1358                                        GAsyncResult   *result,
1359                                        GError        **error)
1360 {
1361   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1362   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1363   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
1364
1365   return g_task_propagate_boolean (G_TASK (result), error);
1366 }
1367
1368 /**
1369  * g_output_stream_close_finish:
1370  * @stream: a #GOutputStream.
1371  * @result: a #GAsyncResult.
1372  * @error: a #GError location to store the error occurring, or %NULL to 
1373  * ignore.
1374  * 
1375  * Closes an output stream.
1376  * 
1377  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1378  **/
1379 gboolean
1380 g_output_stream_close_finish (GOutputStream  *stream,
1381                               GAsyncResult   *result,
1382                               GError        **error)
1383 {
1384   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1385   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1386   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1387
1388   /* @result is always the GTask created by g_output_stream_close_async();
1389    * we called class->close_finish() from async_ready_close_callback_wrapper.
1390    */
1391   return g_task_propagate_boolean (G_TASK (result), error);
1392 }
1393
1394 /**
1395  * g_output_stream_is_closed:
1396  * @stream: a #GOutputStream.
1397  * 
1398  * Checks if an output stream has already been closed.
1399  * 
1400  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1401  **/
1402 gboolean
1403 g_output_stream_is_closed (GOutputStream *stream)
1404 {
1405   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1406   
1407   return stream->priv->closed;
1408 }
1409
1410 /**
1411  * g_output_stream_is_closing:
1412  * @stream: a #GOutputStream.
1413  *
1414  * Checks if an output stream is being closed. This can be
1415  * used inside e.g. a flush implementation to see if the
1416  * flush (or other i/o operation) is called from within
1417  * the closing operation.
1418  *
1419  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1420  *
1421  * Since: 2.24
1422  **/
1423 gboolean
1424 g_output_stream_is_closing (GOutputStream *stream)
1425 {
1426   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1427
1428   return stream->priv->closing;
1429 }
1430
1431 /**
1432  * g_output_stream_has_pending:
1433  * @stream: a #GOutputStream.
1434  * 
1435  * Checks if an ouput stream has pending actions.
1436  * 
1437  * Returns: %TRUE if @stream has pending actions. 
1438  **/
1439 gboolean
1440 g_output_stream_has_pending (GOutputStream *stream)
1441 {
1442   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1443   
1444   return stream->priv->pending;
1445 }
1446
1447 /**
1448  * g_output_stream_set_pending:
1449  * @stream: a #GOutputStream.
1450  * @error: a #GError location to store the error occurring, or %NULL to 
1451  * ignore.
1452  * 
1453  * Sets @stream to have actions pending. If the pending flag is
1454  * already set or @stream is closed, it will return %FALSE and set
1455  * @error.
1456  *
1457  * Return value: %TRUE if pending was previously unset and is now set.
1458  **/
1459 gboolean
1460 g_output_stream_set_pending (GOutputStream *stream,
1461                              GError **error)
1462 {
1463   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1464   
1465   if (stream->priv->closed)
1466     {
1467       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1468                            _("Stream is already closed"));
1469       return FALSE;
1470     }
1471   
1472   if (stream->priv->pending)
1473     {
1474       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1475                            /* Translators: This is an error you get if there is
1476                             * already an operation running against this stream when
1477                             * you try to start one */
1478                            _("Stream has outstanding operation"));
1479       return FALSE;
1480     }
1481   
1482   stream->priv->pending = TRUE;
1483   return TRUE;
1484 }
1485
1486 /**
1487  * g_output_stream_clear_pending:
1488  * @stream: output stream
1489  * 
1490  * Clears the pending flag on @stream.
1491  **/
1492 void
1493 g_output_stream_clear_pending (GOutputStream *stream)
1494 {
1495   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1496   
1497   stream->priv->pending = FALSE;
1498 }
1499
1500 /**
1501  * g_output_stream_async_write_is_via_threads:
1502  * @stream: a #GOutputStream.
1503  *
1504  * Checks if an ouput stream's write_async function uses threads.
1505  *
1506  * Returns: %TRUE if @stream's write_async function uses threads.
1507  **/
1508 gboolean
1509 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
1510 {
1511   GOutputStreamClass *class;
1512
1513   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1514
1515   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1516
1517   return (class->write_async == g_output_stream_real_write_async &&
1518       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1519         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
1520 }
1521
1522
1523 /********************************************
1524  *   Default implementation of async ops    *
1525  ********************************************/
1526
1527 typedef struct {
1528   const void         *buffer;
1529   gsize               count_requested;
1530   gssize              count_written;
1531 } WriteData;
1532
1533 static void
1534 free_write_data (WriteData *op)
1535 {
1536   g_slice_free (WriteData, op);
1537 }
1538
1539 static void
1540 write_async_thread (GTask        *task,
1541                     gpointer      source_object,
1542                     gpointer      task_data,
1543                     GCancellable *cancellable)
1544 {
1545   GOutputStream *stream = source_object;
1546   WriteData *op = task_data;
1547   GOutputStreamClass *class;
1548   GError *error = NULL;
1549   gssize count_written;
1550
1551   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1552   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1553                                    cancellable, &error);
1554   if (count_written == -1)
1555     g_task_return_error (task, error);
1556   else
1557     g_task_return_int (task, count_written);
1558 }
1559
1560 static void write_async_pollable (GPollableOutputStream *stream,
1561                                   GTask                 *task);
1562
1563 static gboolean
1564 write_async_pollable_ready (GPollableOutputStream *stream,
1565                             gpointer               user_data)
1566 {
1567   GTask *task = user_data;
1568
1569   write_async_pollable (stream, task);
1570   return FALSE;
1571 }
1572
1573 static void
1574 write_async_pollable (GPollableOutputStream *stream,
1575                       GTask                 *task)
1576 {
1577   GError *error = NULL;
1578   WriteData *op = g_task_get_task_data (task);
1579   gssize count_written;
1580
1581   if (g_task_return_error_if_cancelled (task))
1582     return;
1583
1584   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1585     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1586
1587   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1588     {
1589       GSource *source;
1590
1591       g_error_free (error);
1592
1593       source = g_pollable_output_stream_create_source (stream,
1594                                                        g_task_get_cancellable (task));
1595       g_task_attach_source (task, source,
1596                             (GSourceFunc) write_async_pollable_ready);
1597       g_source_unref (source);
1598       return;
1599     }
1600
1601   if (count_written == -1)
1602     g_task_return_error (task, error);
1603   else
1604     g_task_return_int (task, count_written);
1605 }
1606
1607 static void
1608 g_output_stream_real_write_async (GOutputStream       *stream,
1609                                   const void          *buffer,
1610                                   gsize                count,
1611                                   int                  io_priority,
1612                                   GCancellable        *cancellable,
1613                                   GAsyncReadyCallback  callback,
1614                                   gpointer             user_data)
1615 {
1616   GTask *task;
1617   WriteData *op;
1618
1619   op = g_slice_new0 (WriteData);
1620   task = g_task_new (stream, cancellable, callback, user_data);
1621   g_task_set_check_cancellable (task, FALSE);
1622   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1623   op->buffer = buffer;
1624   op->count_requested = count;
1625
1626   if (!g_output_stream_async_write_is_via_threads (stream))
1627     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1628   else
1629     g_task_run_in_thread (task, write_async_thread);
1630   g_object_unref (task);
1631 }
1632
1633 static gssize
1634 g_output_stream_real_write_finish (GOutputStream  *stream,
1635                                    GAsyncResult   *result,
1636                                    GError        **error)
1637 {
1638   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1639
1640   return g_task_propagate_int (G_TASK (result), error);
1641 }
1642
1643 typedef struct {
1644   GInputStream *source;
1645   GOutputStreamSpliceFlags flags;
1646   gssize n_read;
1647   gssize n_written;
1648   gsize bytes_copied;
1649   GError *error;
1650   guint8 *buffer;
1651 } SpliceData;
1652
1653 static void
1654 free_splice_data (SpliceData *op)
1655 {
1656   g_clear_pointer (&op->buffer, g_free);
1657   g_object_unref (op->source);
1658   g_clear_error (&op->error);
1659   g_free (op);
1660 }
1661
1662 static void
1663 real_splice_async_complete_cb (GTask *task)
1664 {
1665   SpliceData *op = g_task_get_task_data (task);
1666
1667   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
1668       !g_input_stream_is_closed (op->source))
1669     return;
1670
1671   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
1672       !g_output_stream_is_closed (g_task_get_source_object (task)))
1673     return;
1674
1675   if (op->error != NULL)
1676     {
1677       g_task_return_error (task, op->error);
1678       op->error = NULL;
1679     }
1680   else
1681     {
1682       g_task_return_int (task, op->bytes_copied);
1683     }
1684
1685   g_object_unref (task);
1686 }
1687
1688 static void
1689 real_splice_async_close_input_cb (GObject      *source,
1690                                   GAsyncResult *res,
1691                                   gpointer      user_data)
1692 {
1693   GTask *task = user_data;
1694
1695   g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
1696
1697   real_splice_async_complete_cb (task);
1698 }
1699
1700 static void
1701 real_splice_async_close_output_cb (GObject      *source,
1702                                    GAsyncResult *res,
1703                                    gpointer      user_data)
1704 {
1705   GTask *task = G_TASK (user_data);
1706   SpliceData *op = g_task_get_task_data (task);
1707   GError **error = (op->error == NULL) ? &op->error : NULL;
1708
1709   g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
1710
1711   real_splice_async_complete_cb (task);
1712 }
1713
1714 static void
1715 real_splice_async_complete (GTask *task)
1716 {
1717   SpliceData *op = g_task_get_task_data (task);
1718   gboolean done = TRUE;
1719
1720   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
1721     {
1722       done = FALSE;
1723       g_input_stream_close_async (op->source, g_task_get_priority (task),
1724                                   g_task_get_cancellable (task),
1725                                   real_splice_async_close_input_cb, task);
1726     }
1727
1728   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
1729     {
1730       done = FALSE;
1731       g_output_stream_internal_close_async (g_task_get_source_object (task),
1732                                             g_task_get_priority (task),
1733                                             g_task_get_cancellable (task),
1734                                             real_splice_async_close_output_cb,
1735                                             task);
1736     }
1737
1738   if (done)
1739     real_splice_async_complete_cb (task);
1740 }
1741
1742 static void real_splice_async_read_cb (GObject      *source,
1743                                        GAsyncResult *res,
1744                                        gpointer      user_data);
1745
1746 static void
1747 real_splice_async_write_cb (GObject      *source,
1748                             GAsyncResult *res,
1749                             gpointer      user_data)
1750 {
1751   GOutputStreamClass *class;
1752   GTask *task = G_TASK (user_data);
1753   SpliceData *op = g_task_get_task_data (task);
1754   gssize ret;
1755
1756   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1757
1758   ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
1759
1760   if (ret == -1)
1761     {
1762       real_splice_async_complete (task);
1763       return;
1764     }
1765
1766   op->n_written += ret;
1767   op->bytes_copied += ret;
1768   if (op->bytes_copied > G_MAXSSIZE)
1769     op->bytes_copied = G_MAXSSIZE;
1770
1771   if (op->n_written < op->n_read)
1772     {
1773       class->write_async (g_task_get_source_object (task),
1774                           op->buffer + op->n_written,
1775                           op->n_read - op->n_written,
1776                           g_task_get_priority (task),
1777                           g_task_get_cancellable (task),
1778                           real_splice_async_write_cb, task);
1779       return;
1780     }
1781
1782   g_input_stream_read_async (op->source, op->buffer, 8192,
1783                              g_task_get_priority (task),
1784                              g_task_get_cancellable (task),
1785                              real_splice_async_read_cb, task);
1786 }
1787
1788 static void
1789 real_splice_async_read_cb (GObject      *source,
1790                            GAsyncResult *res,
1791                            gpointer      user_data)
1792 {
1793   GOutputStreamClass *class;
1794   GTask *task = G_TASK (user_data);
1795   SpliceData *op = g_task_get_task_data (task);
1796   gssize ret;
1797
1798   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1799
1800   ret = g_input_stream_read_finish (op->source, res, &op->error);
1801   if (ret == -1 || ret == 0)
1802     {
1803       real_splice_async_complete (task);
1804       return;
1805     }
1806
1807   op->n_read = ret;
1808   op->n_written = 0;
1809
1810   class->write_async (g_task_get_source_object (task), op->buffer,
1811                       op->n_read, g_task_get_priority (task),
1812                       g_task_get_cancellable (task),
1813                       real_splice_async_write_cb, task);
1814 }
1815
1816 static void
1817 splice_async_thread (GTask        *task,
1818                      gpointer      source_object,
1819                      gpointer      task_data,
1820                      GCancellable *cancellable)
1821 {
1822   GOutputStream *stream = source_object;
1823   SpliceData *op = task_data;
1824   GOutputStreamClass *class;
1825   GError *error = NULL;
1826   gssize bytes_copied;
1827
1828   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1829   
1830   bytes_copied = class->splice (stream,
1831                                 op->source,
1832                                 op->flags,
1833                                 cancellable,
1834                                 &error);
1835   if (bytes_copied == -1)
1836     g_task_return_error (task, error);
1837   else
1838     g_task_return_int (task, bytes_copied);
1839 }
1840
1841 static void
1842 g_output_stream_real_splice_async (GOutputStream             *stream,
1843                                    GInputStream              *source,
1844                                    GOutputStreamSpliceFlags   flags,
1845                                    int                        io_priority,
1846                                    GCancellable              *cancellable,
1847                                    GAsyncReadyCallback        callback,
1848                                    gpointer                   user_data)
1849 {
1850   GTask *task;
1851   SpliceData *op;
1852
1853   op = g_new0 (SpliceData, 1);
1854   task = g_task_new (stream, cancellable, callback, user_data);
1855   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1856   op->flags = flags;
1857   op->source = g_object_ref (source);
1858
1859   if (g_input_stream_async_read_is_via_threads (source) &&
1860       g_output_stream_async_write_is_via_threads (stream))
1861     {
1862       g_task_run_in_thread (task, splice_async_thread);
1863       g_object_unref (task);
1864     }
1865   else
1866     {
1867       op->buffer = g_malloc (8192);
1868       g_input_stream_read_async (op->source, op->buffer, 8192,
1869                                  g_task_get_priority (task),
1870                                  g_task_get_cancellable (task),
1871                                  real_splice_async_read_cb, task);
1872     }
1873 }
1874
1875 static gssize
1876 g_output_stream_real_splice_finish (GOutputStream  *stream,
1877                                     GAsyncResult   *result,
1878                                     GError        **error)
1879 {
1880   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1881
1882   return g_task_propagate_int (G_TASK (result), error);
1883 }
1884
1885
1886 static void
1887 flush_async_thread (GTask        *task,
1888                     gpointer      source_object,
1889                     gpointer      task_data,
1890                     GCancellable *cancellable)
1891 {
1892   GOutputStream *stream = source_object;
1893   GOutputStreamClass *class;
1894   gboolean result;
1895   GError *error = NULL;
1896
1897   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1898   result = TRUE;
1899   if (class->flush)
1900     result = class->flush (stream, cancellable, &error);
1901
1902   if (result)
1903     g_task_return_boolean (task, TRUE);
1904   else
1905     g_task_return_error (task, error);
1906 }
1907
1908 static void
1909 g_output_stream_real_flush_async (GOutputStream       *stream,
1910                                   int                  io_priority,
1911                                   GCancellable        *cancellable,
1912                                   GAsyncReadyCallback  callback,
1913                                   gpointer             user_data)
1914 {
1915   GTask *task;
1916
1917   task = g_task_new (stream, cancellable, callback, user_data);
1918   g_task_set_priority (task, io_priority);
1919   g_task_run_in_thread (task, flush_async_thread);
1920   g_object_unref (task);
1921 }
1922
1923 static gboolean
1924 g_output_stream_real_flush_finish (GOutputStream  *stream,
1925                                    GAsyncResult   *result,
1926                                    GError        **error)
1927 {
1928   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1929
1930   return g_task_propagate_boolean (G_TASK (result), error);
1931 }
1932
1933 static void
1934 close_async_thread (GTask        *task,
1935                     gpointer      source_object,
1936                     gpointer      task_data,
1937                     GCancellable *cancellable)
1938 {
1939   GOutputStream *stream = source_object;
1940   GOutputStreamClass *class;
1941   GError *error = NULL;
1942   gboolean result = TRUE;
1943
1944   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1945
1946   /* Do a flush here if there is a flush function, and we did not have to do
1947    * an async flush before (see g_output_stream_close_async)
1948    */
1949   if (class->flush != NULL &&
1950       (class->flush_async == NULL ||
1951        class->flush_async == g_output_stream_real_flush_async))
1952     {
1953       result = class->flush (stream, cancellable, &error);
1954     }
1955
1956   /* Auto handling of cancelation disabled, and ignore
1957      cancellation, since we want to close things anyway, although
1958      possibly in a quick-n-dirty way. At least we never want to leak
1959      open handles */
1960
1961   if (class->close_fn)
1962     {
1963       /* Make sure to close, even if the flush failed (see sync close) */
1964       if (!result)
1965         class->close_fn (stream, cancellable, NULL);
1966       else
1967         result = class->close_fn (stream, cancellable, &error);
1968     }
1969
1970   if (result)
1971     g_task_return_boolean (task, TRUE);
1972   else
1973     g_task_return_error (task, error);
1974 }
1975
1976 static void
1977 g_output_stream_real_close_async (GOutputStream       *stream,
1978                                   int                  io_priority,
1979                                   GCancellable        *cancellable,
1980                                   GAsyncReadyCallback  callback,
1981                                   gpointer             user_data)
1982 {
1983   GTask *task;
1984
1985   task = g_task_new (stream, cancellable, callback, user_data);
1986   g_task_set_priority (task, io_priority);
1987   g_task_run_in_thread (task, close_async_thread);
1988   g_object_unref (task);
1989 }
1990
1991 static gboolean
1992 g_output_stream_real_close_finish (GOutputStream  *stream,
1993                                    GAsyncResult   *result,
1994                                    GError        **error)
1995 {
1996   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1997
1998   return g_task_propagate_boolean (G_TASK (result), error);
1999 }