GOutputStream: Add g_output_stream_async_write_is_via_threads()
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gtask.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "gioprivate.h"
31 #include "glibintl.h"
32 #include "gpollableoutputstream.h"
33
34 /**
35  * SECTION:goutputstream
36  * @short_description: Base class for implementing streaming output
37  * @include: gio/gio.h
38  *
39  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
40  * to close a stream (g_output_stream_close()) and to flush pending writes
41  * (g_output_stream_flush()). 
42  *
43  * To copy the content of an input stream to an output stream without 
44  * manually handling the reads and writes, use g_output_stream_splice(). 
45  *
46  * All of these functions have async variants too.
47  **/
48
49 struct _GOutputStreamPrivate {
50   guint closed : 1;
51   guint pending : 1;
52   guint closing : 1;
53   GAsyncReadyCallback outstanding_callback;
54 };
55
56 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
57
58 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
59                                                     GInputStream              *source,
60                                                     GOutputStreamSpliceFlags   flags,
61                                                     GCancellable              *cancellable,
62                                                     GError                   **error);
63 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
64                                                     const void                *buffer,
65                                                     gsize                      count,
66                                                     int                        io_priority,
67                                                     GCancellable              *cancellable,
68                                                     GAsyncReadyCallback        callback,
69                                                     gpointer                   data);
70 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
71                                                     GAsyncResult              *result,
72                                                     GError                   **error);
73 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
74                                                     GInputStream              *source,
75                                                     GOutputStreamSpliceFlags   flags,
76                                                     int                        io_priority,
77                                                     GCancellable              *cancellable,
78                                                     GAsyncReadyCallback        callback,
79                                                     gpointer                   data);
80 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
81                                                     GAsyncResult              *result,
82                                                     GError                   **error);
83 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
84                                                     int                        io_priority,
85                                                     GCancellable              *cancellable,
86                                                     GAsyncReadyCallback        callback,
87                                                     gpointer                   data);
88 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
89                                                     GAsyncResult              *result,
90                                                     GError                   **error);
91 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
92                                                     int                        io_priority,
93                                                     GCancellable              *cancellable,
94                                                     GAsyncReadyCallback        callback,
95                                                     gpointer                   data);
96 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
97                                                     GAsyncResult              *result,
98                                                     GError                   **error);
99 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
100                                                     GCancellable              *cancellable,
101                                                     GError                   **error);
102
103 static void
104 g_output_stream_dispose (GObject *object)
105 {
106   GOutputStream *stream;
107
108   stream = G_OUTPUT_STREAM (object);
109   
110   if (!stream->priv->closed)
111     g_output_stream_close (stream, NULL, NULL);
112
113   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
114 }
115
116 static void
117 g_output_stream_class_init (GOutputStreamClass *klass)
118 {
119   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
120
121   gobject_class->dispose = g_output_stream_dispose;
122
123   klass->splice = g_output_stream_real_splice;
124   
125   klass->write_async = g_output_stream_real_write_async;
126   klass->write_finish = g_output_stream_real_write_finish;
127   klass->splice_async = g_output_stream_real_splice_async;
128   klass->splice_finish = g_output_stream_real_splice_finish;
129   klass->flush_async = g_output_stream_real_flush_async;
130   klass->flush_finish = g_output_stream_real_flush_finish;
131   klass->close_async = g_output_stream_real_close_async;
132   klass->close_finish = g_output_stream_real_close_finish;
133 }
134
135 static void
136 g_output_stream_init (GOutputStream *stream)
137 {
138   stream->priv = g_output_stream_get_instance_private (stream);
139 }
140
141 /**
142  * g_output_stream_write:
143  * @stream: a #GOutputStream.
144  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
145  * @count: the number of bytes to write
146  * @cancellable: (allow-none): optional cancellable object
147  * @error: location to store the error occurring, or %NULL to ignore
148  *
149  * Tries to write @count bytes from @buffer into the stream. Will block
150  * during the operation.
151  * 
152  * If count is 0, returns 0 and does nothing. A value of @count
153  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
154  *
155  * On success, the number of bytes written to the stream is returned.
156  * It is not an error if this is not the same as the requested size, as it
157  * can happen e.g. on a partial I/O error, or if there is not enough
158  * storage in the stream. All writes block until at least one byte
159  * is written or an error occurs; 0 is never returned (unless
160  * @count is 0).
161  * 
162  * If @cancellable is not %NULL, then the operation can be cancelled by
163  * triggering the cancellable object from another thread. If the operation
164  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
165  * operation was partially finished when the operation was cancelled the
166  * partial result will be returned, without an error.
167  *
168  * On error -1 is returned and @error is set accordingly.
169  * 
170  * Virtual: write_fn
171  *
172  * Return value: Number of bytes written, or -1 on error
173  **/
174 gssize
175 g_output_stream_write (GOutputStream  *stream,
176                        const void     *buffer,
177                        gsize           count,
178                        GCancellable   *cancellable,
179                        GError        **error)
180 {
181   GOutputStreamClass *class;
182   gssize res;
183
184   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
185   g_return_val_if_fail (buffer != NULL, 0);
186
187   if (count == 0)
188     return 0;
189   
190   if (((gssize) count) < 0)
191     {
192       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
193                    _("Too large count value passed to %s"), G_STRFUNC);
194       return -1;
195     }
196
197   class = G_OUTPUT_STREAM_GET_CLASS (stream);
198
199   if (class->write_fn == NULL) 
200     {
201       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
202                            _("Output stream doesn't implement write"));
203       return -1;
204     }
205   
206   if (!g_output_stream_set_pending (stream, error))
207     return -1;
208   
209   if (cancellable)
210     g_cancellable_push_current (cancellable);
211   
212   res = class->write_fn (stream, buffer, count, cancellable, error);
213   
214   if (cancellable)
215     g_cancellable_pop_current (cancellable);
216   
217   g_output_stream_clear_pending (stream);
218
219   return res; 
220 }
221
222 /**
223  * g_output_stream_write_all:
224  * @stream: a #GOutputStream.
225  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
226  * @count: the number of bytes to write
227  * @bytes_written: (out): location to store the number of bytes that was 
228  *     written to the stream
229  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
230  * @error: location to store the error occurring, or %NULL to ignore
231  *
232  * Tries to write @count bytes from @buffer into the stream. Will block
233  * during the operation.
234  * 
235  * This function is similar to g_output_stream_write(), except it tries to
236  * write as many bytes as requested, only stopping on an error.
237  *
238  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
239  * is set to @count.
240  * 
241  * If there is an error during the operation %FALSE is returned and @error
242  * is set to indicate the error status, @bytes_written is updated to contain
243  * the number of bytes written into the stream before the error occurred.
244  *
245  * Return value: %TRUE on success, %FALSE if there was an error
246  **/
247 gboolean
248 g_output_stream_write_all (GOutputStream  *stream,
249                            const void     *buffer,
250                            gsize           count,
251                            gsize          *bytes_written,
252                            GCancellable   *cancellable,
253                            GError        **error)
254 {
255   gsize _bytes_written;
256   gssize res;
257
258   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
259   g_return_val_if_fail (buffer != NULL, FALSE);
260
261   _bytes_written = 0;
262   while (_bytes_written < count)
263     {
264       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
265                                    cancellable, error);
266       if (res == -1)
267         {
268           if (bytes_written)
269             *bytes_written = _bytes_written;
270           return FALSE;
271         }
272       
273       if (res == 0)
274         g_warning ("Write returned zero without error");
275
276       _bytes_written += res;
277     }
278   
279   if (bytes_written)
280     *bytes_written = _bytes_written;
281
282   return TRUE;
283 }
284
285 /**
286  * g_output_stream_write_bytes:
287  * @stream: a #GOutputStream.
288  * @bytes: the #GBytes to write
289  * @cancellable: (allow-none): optional cancellable object
290  * @error: location to store the error occurring, or %NULL to ignore
291  *
292  * Tries to write the data from @bytes into the stream. Will block
293  * during the operation.
294  *
295  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
296  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
297  *
298  * On success, the number of bytes written to the stream is returned.
299  * It is not an error if this is not the same as the requested size, as it
300  * can happen e.g. on a partial I/O error, or if there is not enough
301  * storage in the stream. All writes block until at least one byte
302  * is written or an error occurs; 0 is never returned (unless
303  * the size of @bytes is 0).
304  *
305  * If @cancellable is not %NULL, then the operation can be cancelled by
306  * triggering the cancellable object from another thread. If the operation
307  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
308  * operation was partially finished when the operation was cancelled the
309  * partial result will be returned, without an error.
310  *
311  * On error -1 is returned and @error is set accordingly.
312  *
313  * Return value: Number of bytes written, or -1 on error
314  **/
315 gssize
316 g_output_stream_write_bytes (GOutputStream  *stream,
317                              GBytes         *bytes,
318                              GCancellable   *cancellable,
319                              GError        **error)
320 {
321   gsize size;
322   gconstpointer data;
323
324   data = g_bytes_get_data (bytes, &size);
325
326   return g_output_stream_write (stream,
327                                 data, size,
328                                 cancellable,
329                                 error);
330 }
331
332 /**
333  * g_output_stream_flush:
334  * @stream: a #GOutputStream.
335  * @cancellable: (allow-none): optional cancellable object
336  * @error: location to store the error occurring, or %NULL to ignore
337  *
338  * Forces a write of all user-space buffered data for the given
339  * @stream. Will block during the operation. Closing the stream will
340  * implicitly cause a flush.
341  *
342  * This function is optional for inherited classes.
343  * 
344  * If @cancellable is not %NULL, then the operation can be cancelled by
345  * triggering the cancellable object from another thread. If the operation
346  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
347  *
348  * Return value: %TRUE on success, %FALSE on error
349  **/
350 gboolean
351 g_output_stream_flush (GOutputStream  *stream,
352                        GCancellable   *cancellable,
353                        GError        **error)
354 {
355   GOutputStreamClass *class;
356   gboolean res;
357
358   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
359
360   if (!g_output_stream_set_pending (stream, error))
361     return FALSE;
362   
363   class = G_OUTPUT_STREAM_GET_CLASS (stream);
364
365   res = TRUE;
366   if (class->flush)
367     {
368       if (cancellable)
369         g_cancellable_push_current (cancellable);
370       
371       res = class->flush (stream, cancellable, error);
372       
373       if (cancellable)
374         g_cancellable_pop_current (cancellable);
375     }
376   
377   g_output_stream_clear_pending (stream);
378
379   return res;
380 }
381
382 /**
383  * g_output_stream_splice:
384  * @stream: a #GOutputStream.
385  * @source: a #GInputStream.
386  * @flags: a set of #GOutputStreamSpliceFlags.
387  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
388  * @error: a #GError location to store the error occurring, or %NULL to
389  * ignore.
390  *
391  * Splices an input stream into an output stream.
392  *
393  * Returns: a #gssize containing the size of the data spliced, or
394  *     -1 if an error occurred. Note that if the number of bytes
395  *     spliced is greater than %G_MAXSSIZE, then that will be
396  *     returned, and there is no way to determine the actual number
397  *     of bytes spliced.
398  **/
399 gssize
400 g_output_stream_splice (GOutputStream             *stream,
401                         GInputStream              *source,
402                         GOutputStreamSpliceFlags   flags,
403                         GCancellable              *cancellable,
404                         GError                   **error)
405 {
406   GOutputStreamClass *class;
407   gssize bytes_copied;
408
409   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
410   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
411
412   if (g_input_stream_is_closed (source))
413     {
414       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
415                            _("Source stream is already closed"));
416       return -1;
417     }
418
419   if (!g_output_stream_set_pending (stream, error))
420     return -1;
421
422   class = G_OUTPUT_STREAM_GET_CLASS (stream);
423
424   if (cancellable)
425     g_cancellable_push_current (cancellable);
426
427   bytes_copied = class->splice (stream, source, flags, cancellable, error);
428
429   if (cancellable)
430     g_cancellable_pop_current (cancellable);
431
432   g_output_stream_clear_pending (stream);
433
434   return bytes_copied;
435 }
436
437 static gssize
438 g_output_stream_real_splice (GOutputStream             *stream,
439                              GInputStream              *source,
440                              GOutputStreamSpliceFlags   flags,
441                              GCancellable              *cancellable,
442                              GError                   **error)
443 {
444   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
445   gssize n_read, n_written;
446   gsize bytes_copied;
447   char buffer[8192], *p;
448   gboolean res;
449
450   bytes_copied = 0;
451   if (class->write_fn == NULL)
452     {
453       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
454                            _("Output stream doesn't implement write"));
455       res = FALSE;
456       goto notsupported;
457     }
458
459   res = TRUE;
460   do
461     {
462       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
463       if (n_read == -1)
464         {
465           res = FALSE;
466           break;
467         }
468
469       if (n_read == 0)
470         break;
471
472       p = buffer;
473       while (n_read > 0)
474         {
475           n_written = class->write_fn (stream, p, n_read, cancellable, error);
476           if (n_written == -1)
477             {
478               res = FALSE;
479               break;
480             }
481
482           p += n_written;
483           n_read -= n_written;
484           bytes_copied += n_written;
485         }
486
487       if (bytes_copied > G_MAXSSIZE)
488         bytes_copied = G_MAXSSIZE;
489     }
490   while (res);
491
492  notsupported:
493   if (!res)
494     error = NULL; /* Ignore further errors */
495
496   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
497     {
498       /* Don't care about errors in source here */
499       g_input_stream_close (source, cancellable, NULL);
500     }
501
502   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
503     {
504       /* But write errors on close are bad! */
505       res = _g_output_stream_close_internal (stream, cancellable, error);
506     }
507
508   if (res)
509     return bytes_copied;
510
511   return -1;
512 }
513
514 /* Must always be called inside
515  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
516 static gboolean
517 _g_output_stream_close_internal (GOutputStream  *stream,
518                                  GCancellable   *cancellable,
519                                  GError        **error)
520 {
521   GOutputStreamClass *class;
522   gboolean res;
523
524   if (stream->priv->closed)
525     return TRUE;
526
527   class = G_OUTPUT_STREAM_GET_CLASS (stream);
528
529   stream->priv->closing = TRUE;
530
531   if (cancellable)
532     g_cancellable_push_current (cancellable);
533
534   if (class->flush)
535     res = class->flush (stream, cancellable, error);
536   else
537     res = TRUE;
538
539   if (!res)
540     {
541       /* flushing caused the error that we want to return,
542        * but we still want to close the underlying stream if possible
543        */
544       if (class->close_fn)
545         class->close_fn (stream, cancellable, NULL);
546     }
547   else
548     {
549       res = TRUE;
550       if (class->close_fn)
551         res = class->close_fn (stream, cancellable, error);
552     }
553
554   if (cancellable)
555     g_cancellable_pop_current (cancellable);
556
557   stream->priv->closing = FALSE;
558   stream->priv->closed = TRUE;
559
560   return res;
561 }
562
563 /**
564  * g_output_stream_close:
565  * @stream: A #GOutputStream.
566  * @cancellable: (allow-none): optional cancellable object
567  * @error: location to store the error occurring, or %NULL to ignore
568  *
569  * Closes the stream, releasing resources related to it.
570  *
571  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
572  * Closing a stream multiple times will not return an error.
573  *
574  * Closing a stream will automatically flush any outstanding buffers in the
575  * stream.
576  *
577  * Streams will be automatically closed when the last reference
578  * is dropped, but you might want to call this function to make sure 
579  * resources are released as early as possible.
580  *
581  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
582  * open after the stream is closed. See the documentation for the individual
583  * stream for details.
584  *
585  * On failure the first error that happened will be reported, but the close
586  * operation will finish as much as possible. A stream that failed to
587  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
588  * is important to check and report the error to the user, otherwise
589  * there might be a loss of data as all data might not be written.
590  * 
591  * If @cancellable is not %NULL, then the operation can be cancelled by
592  * triggering the cancellable object from another thread. If the operation
593  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
594  * Cancelling a close will still leave the stream closed, but there some streams
595  * can use a faster close that doesn't block to e.g. check errors. On
596  * cancellation (as with any error) there is no guarantee that all written
597  * data will reach the target. 
598  *
599  * Return value: %TRUE on success, %FALSE on failure
600  **/
601 gboolean
602 g_output_stream_close (GOutputStream  *stream,
603                        GCancellable   *cancellable,
604                        GError        **error)
605 {
606   gboolean res;
607
608   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
609
610   if (stream->priv->closed)
611     return TRUE;
612
613   if (!g_output_stream_set_pending (stream, error))
614     return FALSE;
615
616   res = _g_output_stream_close_internal (stream, cancellable, error);
617
618   g_output_stream_clear_pending (stream);
619   
620   return res;
621 }
622
623 static void
624 async_ready_write_callback_wrapper (GObject      *source_object,
625                                     GAsyncResult *res,
626                                     gpointer      user_data)
627 {
628   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
629   GOutputStreamClass *class;
630   GTask *task = user_data;
631   gssize nwrote;
632   GError *error = NULL;
633
634   g_output_stream_clear_pending (stream);
635   
636   if (g_async_result_legacy_propagate_error (res, &error))
637     nwrote = -1;
638   else
639     {
640       class = G_OUTPUT_STREAM_GET_CLASS (stream);
641       nwrote = class->write_finish (stream, res, &error);
642     }
643
644   if (nwrote >= 0)
645     g_task_return_int (task, nwrote);
646   else
647     g_task_return_error (task, error);
648   g_object_unref (task);
649 }
650
651 /**
652  * g_output_stream_write_async:
653  * @stream: A #GOutputStream.
654  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
655  * @count: the number of bytes to write
656  * @io_priority: the io priority of the request.
657  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
658  * @callback: (scope async): callback to call when the request is satisfied
659  * @user_data: (closure): the data to pass to callback function
660  *
661  * Request an asynchronous write of @count bytes from @buffer into 
662  * the stream. When the operation is finished @callback will be called.
663  * You can then call g_output_stream_write_finish() to get the result of the 
664  * operation.
665  *
666  * During an async request no other sync and async calls are allowed, 
667  * and will result in %G_IO_ERROR_PENDING errors. 
668  *
669  * A value of @count larger than %G_MAXSSIZE will cause a 
670  * %G_IO_ERROR_INVALID_ARGUMENT error.
671  *
672  * On success, the number of bytes written will be passed to the
673  * @callback. It is not an error if this is not the same as the 
674  * requested size, as it can happen e.g. on a partial I/O error, 
675  * but generally we try to write as many bytes as requested. 
676  *
677  * You are guaranteed that this method will never fail with
678  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
679  * method will just wait until this changes.
680  *
681  * Any outstanding I/O request with higher priority (lower numerical 
682  * value) will be executed before an outstanding request with lower 
683  * priority. Default priority is %G_PRIORITY_DEFAULT.
684  *
685  * The asyncronous methods have a default fallback that uses threads 
686  * to implement asynchronicity, so they are optional for inheriting 
687  * classes. However, if you override one you must override all.
688  *
689  * For the synchronous, blocking version of this function, see 
690  * g_output_stream_write().
691  **/
692 void
693 g_output_stream_write_async (GOutputStream       *stream,
694                              const void          *buffer,
695                              gsize                count,
696                              int                  io_priority,
697                              GCancellable        *cancellable,
698                              GAsyncReadyCallback  callback,
699                              gpointer             user_data)
700 {
701   GOutputStreamClass *class;
702   GError *error = NULL;
703   GTask *task;
704
705   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
706   g_return_if_fail (buffer != NULL);
707
708   task = g_task_new (stream, cancellable, callback, user_data);
709   g_task_set_source_tag (task, g_output_stream_write_async);
710   g_task_set_priority (task, io_priority);
711
712   if (count == 0)
713     {
714       g_task_return_int (task, 0);
715       g_object_unref (task);
716       return;
717     }
718
719   if (((gssize) count) < 0)
720     {
721       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
722                                _("Too large count value passed to %s"),
723                                G_STRFUNC);
724       g_object_unref (task);
725       return;
726     }
727
728   if (!g_output_stream_set_pending (stream, &error))
729     {
730       g_task_return_error (task, error);
731       g_object_unref (task);
732       return;
733     }
734   
735   class = G_OUTPUT_STREAM_GET_CLASS (stream);
736
737   class->write_async (stream, buffer, count, io_priority, cancellable,
738                       async_ready_write_callback_wrapper, task);
739 }
740
741 /**
742  * g_output_stream_write_finish:
743  * @stream: a #GOutputStream.
744  * @result: a #GAsyncResult.
745  * @error: a #GError location to store the error occurring, or %NULL to 
746  * ignore.
747  * 
748  * Finishes a stream write operation.
749  * 
750  * Returns: a #gssize containing the number of bytes written to the stream.
751  **/
752 gssize
753 g_output_stream_write_finish (GOutputStream  *stream,
754                               GAsyncResult   *result,
755                               GError        **error)
756 {
757   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
758   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
759   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
760
761   /* @result is always the GTask created by g_output_stream_write_async();
762    * we called class->write_finish() from async_ready_write_callback_wrapper.
763    */
764   return g_task_propagate_int (G_TASK (result), error);
765 }
766
767 static void
768 write_bytes_callback (GObject      *stream,
769                       GAsyncResult *result,
770                       gpointer      user_data)
771 {
772   GTask *task = user_data;
773   GError *error = NULL;
774   gssize nwrote;
775
776   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
777                                          result, &error);
778   if (nwrote == -1)
779     g_task_return_error (task, error);
780   else
781     g_task_return_int (task, nwrote);
782   g_object_unref (task);
783 }
784
785 /**
786  * g_output_stream_write_bytes_async:
787  * @stream: A #GOutputStream.
788  * @bytes: The bytes to write
789  * @io_priority: the io priority of the request.
790  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
791  * @callback: (scope async): callback to call when the request is satisfied
792  * @user_data: (closure): the data to pass to callback function
793  *
794  * Request an asynchronous write of the data in @bytes to the stream.
795  * When the operation is finished @callback will be called. You can
796  * then call g_output_stream_write_bytes_finish() to get the result of
797  * the operation.
798  *
799  * During an async request no other sync and async calls are allowed,
800  * and will result in %G_IO_ERROR_PENDING errors.
801  *
802  * A #GBytes larger than %G_MAXSSIZE will cause a
803  * %G_IO_ERROR_INVALID_ARGUMENT error.
804  *
805  * On success, the number of bytes written will be passed to the
806  * @callback. It is not an error if this is not the same as the
807  * requested size, as it can happen e.g. on a partial I/O error,
808  * but generally we try to write as many bytes as requested.
809  *
810  * You are guaranteed that this method will never fail with
811  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
812  * method will just wait until this changes.
813  *
814  * Any outstanding I/O request with higher priority (lower numerical
815  * value) will be executed before an outstanding request with lower
816  * priority. Default priority is %G_PRIORITY_DEFAULT.
817  *
818  * For the synchronous, blocking version of this function, see
819  * g_output_stream_write_bytes().
820  **/
821 void
822 g_output_stream_write_bytes_async (GOutputStream       *stream,
823                                    GBytes              *bytes,
824                                    int                  io_priority,
825                                    GCancellable        *cancellable,
826                                    GAsyncReadyCallback  callback,
827                                    gpointer             user_data)
828 {
829   GTask *task;
830   gsize size;
831   gconstpointer data;
832
833   data = g_bytes_get_data (bytes, &size);
834
835   task = g_task_new (stream, cancellable, callback, user_data);
836   g_task_set_task_data (task, g_bytes_ref (bytes),
837                         (GDestroyNotify) g_bytes_unref);
838
839   g_output_stream_write_async (stream,
840                                data, size,
841                                io_priority,
842                                cancellable,
843                                write_bytes_callback,
844                                task);
845 }
846
847 /**
848  * g_output_stream_write_bytes_finish:
849  * @stream: a #GOutputStream.
850  * @result: a #GAsyncResult.
851  * @error: a #GError location to store the error occurring, or %NULL to
852  * ignore.
853  *
854  * Finishes a stream write-from-#GBytes operation.
855  *
856  * Returns: a #gssize containing the number of bytes written to the stream.
857  **/
858 gssize
859 g_output_stream_write_bytes_finish (GOutputStream  *stream,
860                                     GAsyncResult   *result,
861                                     GError        **error)
862 {
863   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
864   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
865
866   return g_task_propagate_int (G_TASK (result), error);
867 }
868
869 static void
870 async_ready_splice_callback_wrapper (GObject      *source_object,
871                                      GAsyncResult *res,
872                                      gpointer     _data)
873 {
874   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
875   GOutputStreamClass *class;
876   GTask *task = _data;
877   gssize nspliced;
878   GError *error = NULL;
879
880   g_output_stream_clear_pending (stream);
881   
882   if (g_async_result_legacy_propagate_error (res, &error))
883     nspliced = -1;
884   else
885     {
886       class = G_OUTPUT_STREAM_GET_CLASS (stream);
887       nspliced = class->splice_finish (stream, res, &error);
888     }
889
890   if (nspliced >= 0)
891     g_task_return_int (task, nspliced);
892   else
893     g_task_return_error (task, error);
894   g_object_unref (task);
895 }
896
897 /**
898  * g_output_stream_splice_async:
899  * @stream: a #GOutputStream.
900  * @source: a #GInputStream. 
901  * @flags: a set of #GOutputStreamSpliceFlags.
902  * @io_priority: the io priority of the request.
903  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
904  * @callback: (scope async): a #GAsyncReadyCallback. 
905  * @user_data: (closure): user data passed to @callback.
906  * 
907  * Splices a stream asynchronously.
908  * When the operation is finished @callback will be called.
909  * You can then call g_output_stream_splice_finish() to get the 
910  * result of the operation.
911  *
912  * For the synchronous, blocking version of this function, see 
913  * g_output_stream_splice().
914  **/
915 void
916 g_output_stream_splice_async (GOutputStream            *stream,
917                               GInputStream             *source,
918                               GOutputStreamSpliceFlags  flags,
919                               int                       io_priority,
920                               GCancellable             *cancellable,
921                               GAsyncReadyCallback       callback,
922                               gpointer                  user_data)
923 {
924   GOutputStreamClass *class;
925   GTask *task;
926   GError *error = NULL;
927
928   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
929   g_return_if_fail (G_IS_INPUT_STREAM (source));
930
931   task = g_task_new (stream, cancellable, callback, user_data);
932   g_task_set_source_tag (task, g_output_stream_splice_async);
933   g_task_set_priority (task, io_priority);
934   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
935
936   if (g_input_stream_is_closed (source))
937     {
938       g_task_return_new_error (task,
939                                G_IO_ERROR, G_IO_ERROR_CLOSED,
940                                _("Source stream is already closed"));
941       g_object_unref (task);
942       return;
943     }
944   
945   if (!g_output_stream_set_pending (stream, &error))
946     {
947       g_task_return_error (task, error);
948       g_object_unref (task);
949       return;
950     }
951
952   class = G_OUTPUT_STREAM_GET_CLASS (stream);
953
954   class->splice_async (stream, source, flags, io_priority, cancellable,
955                        async_ready_splice_callback_wrapper, task);
956 }
957
958 /**
959  * g_output_stream_splice_finish:
960  * @stream: a #GOutputStream.
961  * @result: a #GAsyncResult.
962  * @error: a #GError location to store the error occurring, or %NULL to 
963  * ignore.
964  *
965  * Finishes an asynchronous stream splice operation.
966  * 
967  * Returns: a #gssize of the number of bytes spliced. Note that if the
968  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
969  *     will be returned, and there is no way to determine the actual
970  *     number of bytes spliced.
971  **/
972 gssize
973 g_output_stream_splice_finish (GOutputStream  *stream,
974                                GAsyncResult   *result,
975                                GError        **error)
976 {
977   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
978   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
979   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
980
981   /* @result is always the GTask created by g_output_stream_splice_async();
982    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
983    */
984   return g_task_propagate_int (G_TASK (result), error);
985 }
986
987 static void
988 async_ready_flush_callback_wrapper (GObject      *source_object,
989                                     GAsyncResult *res,
990                                     gpointer      user_data)
991 {
992   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
993   GOutputStreamClass *class;
994   GTask *task = user_data;
995   gboolean flushed;
996   GError *error = NULL;
997
998   g_output_stream_clear_pending (stream);
999   
1000   if (g_async_result_legacy_propagate_error (res, &error))
1001     flushed = FALSE;
1002   else
1003     {
1004       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1005       flushed = class->flush_finish (stream, res, &error);
1006     }
1007
1008   if (flushed)
1009     g_task_return_boolean (task, TRUE);
1010   else
1011     g_task_return_error (task, error);
1012   g_object_unref (task);
1013 }
1014
1015 /**
1016  * g_output_stream_flush_async:
1017  * @stream: a #GOutputStream.
1018  * @io_priority: the io priority of the request.
1019  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1020  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1021  * @user_data: (closure): the data to pass to callback function
1022  * 
1023  * Forces an asynchronous write of all user-space buffered data for
1024  * the given @stream.
1025  * For behaviour details see g_output_stream_flush().
1026  *
1027  * When the operation is finished @callback will be 
1028  * called. You can then call g_output_stream_flush_finish() to get the 
1029  * result of the operation.
1030  **/
1031 void
1032 g_output_stream_flush_async (GOutputStream       *stream,
1033                              int                  io_priority,
1034                              GCancellable        *cancellable,
1035                              GAsyncReadyCallback  callback,
1036                              gpointer             user_data)
1037 {
1038   GOutputStreamClass *class;
1039   GTask *task;
1040   GError *error = NULL;
1041
1042   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1043
1044   task = g_task_new (stream, cancellable, callback, user_data);
1045   g_task_set_source_tag (task, g_output_stream_flush_async);
1046   g_task_set_priority (task, io_priority);
1047
1048   if (!g_output_stream_set_pending (stream, &error))
1049     {
1050       g_task_return_error (task, error);
1051       g_object_unref (task);
1052       return;
1053     }
1054
1055   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1056   
1057   if (class->flush_async == NULL)
1058     {
1059       g_task_return_boolean (task, TRUE);
1060       g_object_unref (task);
1061       return;
1062     }
1063       
1064   class->flush_async (stream, io_priority, cancellable,
1065                       async_ready_flush_callback_wrapper, task);
1066 }
1067
1068 /**
1069  * g_output_stream_flush_finish:
1070  * @stream: a #GOutputStream.
1071  * @result: a GAsyncResult.
1072  * @error: a #GError location to store the error occurring, or %NULL to 
1073  * ignore.
1074  * 
1075  * Finishes flushing an output stream.
1076  * 
1077  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1078  **/
1079 gboolean
1080 g_output_stream_flush_finish (GOutputStream  *stream,
1081                               GAsyncResult   *result,
1082                               GError        **error)
1083 {
1084   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1085   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1086   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1087
1088   /* @result is always the GTask created by g_output_stream_flush_async();
1089    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1090    */
1091   return g_task_propagate_boolean (G_TASK (result), error);
1092 }
1093
1094
1095 static void
1096 async_ready_close_callback_wrapper (GObject      *source_object,
1097                                     GAsyncResult *res,
1098                                     gpointer      user_data)
1099 {
1100   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1101   GOutputStreamClass *class;
1102   GTask *task = user_data;
1103   GError *error = g_task_get_task_data (task);
1104
1105   stream->priv->closing = FALSE;
1106   stream->priv->closed = TRUE;
1107
1108   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1109     {
1110       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1111
1112       class->close_finish (stream, res,
1113                            error ? NULL : &error);
1114     }
1115
1116   g_output_stream_clear_pending (stream);
1117
1118   if (error != NULL)
1119     g_task_return_error (task, error);
1120   else
1121     g_task_return_boolean (task, TRUE);
1122   g_object_unref (task);
1123 }
1124
1125 static void
1126 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1127                                             GAsyncResult *res,
1128                                             gpointer      user_data)
1129 {
1130   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1131   GOutputStreamClass *class;
1132   GTask *task = user_data;
1133   GError *error = NULL;
1134
1135   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1136
1137   if (!g_async_result_legacy_propagate_error (res, &error))
1138     {
1139       class->flush_finish (stream, res, &error);
1140     }
1141
1142   /* propagate the possible error */
1143   if (error)
1144     g_task_set_task_data (task, error, NULL);
1145
1146   /* we still close, even if there was a flush error */
1147   class->close_async (stream,
1148                       g_task_get_priority (task),
1149                       g_task_get_cancellable (task),
1150                       async_ready_close_callback_wrapper, task);
1151 }
1152
1153 /**
1154  * g_output_stream_close_async:
1155  * @stream: A #GOutputStream.
1156  * @io_priority: the io priority of the request.
1157  * @cancellable: (allow-none): optional cancellable object
1158  * @callback: (scope async): callback to call when the request is satisfied
1159  * @user_data: (closure): the data to pass to callback function
1160  *
1161  * Requests an asynchronous close of the stream, releasing resources 
1162  * related to it. When the operation is finished @callback will be 
1163  * called. You can then call g_output_stream_close_finish() to get 
1164  * the result of the operation.
1165  *
1166  * For behaviour details see g_output_stream_close().
1167  *
1168  * The asyncronous methods have a default fallback that uses threads 
1169  * to implement asynchronicity, so they are optional for inheriting 
1170  * classes. However, if you override one you must override all.
1171  **/
1172 void
1173 g_output_stream_close_async (GOutputStream       *stream,
1174                              int                  io_priority,
1175                              GCancellable        *cancellable,
1176                              GAsyncReadyCallback  callback,
1177                              gpointer             user_data)
1178 {
1179   GOutputStreamClass *class;
1180   GTask *task;
1181   GError *error = NULL;
1182
1183   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1184   
1185   task = g_task_new (stream, cancellable, callback, user_data);
1186   g_task_set_source_tag (task, g_output_stream_close_async);
1187   g_task_set_priority (task, io_priority);
1188
1189   if (stream->priv->closed)
1190     {
1191       g_task_return_boolean (task, TRUE);
1192       g_object_unref (task);
1193       return;
1194     }
1195
1196   if (!g_output_stream_set_pending (stream, &error))
1197     {
1198       g_task_return_error (task, error);
1199       g_object_unref (task);
1200       return;
1201     }
1202   
1203   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1204   stream->priv->closing = TRUE;
1205
1206   /* Call close_async directly if there is no need to flush, or if the flush
1207      can be done sync (in the output stream async close thread) */
1208   if (class->flush_async == NULL ||
1209       (class->flush_async == g_output_stream_real_flush_async &&
1210        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1211     {
1212       class->close_async (stream, io_priority, cancellable,
1213                           async_ready_close_callback_wrapper, task);
1214     }
1215   else
1216     {
1217       /* First do an async flush, then do the async close in the callback
1218          wrapper (see async_ready_close_flushed_callback_wrapper) */
1219       class->flush_async (stream, io_priority, cancellable,
1220                           async_ready_close_flushed_callback_wrapper, task);
1221     }
1222 }
1223
1224 /**
1225  * g_output_stream_close_finish:
1226  * @stream: a #GOutputStream.
1227  * @result: a #GAsyncResult.
1228  * @error: a #GError location to store the error occurring, or %NULL to 
1229  * ignore.
1230  * 
1231  * Closes an output stream.
1232  * 
1233  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1234  **/
1235 gboolean
1236 g_output_stream_close_finish (GOutputStream  *stream,
1237                               GAsyncResult   *result,
1238                               GError        **error)
1239 {
1240   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1241   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1242   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1243
1244   /* @result is always the GTask created by g_output_stream_close_async();
1245    * we called class->close_finish() from async_ready_close_callback_wrapper.
1246    */
1247   return g_task_propagate_boolean (G_TASK (result), error);
1248 }
1249
1250 /**
1251  * g_output_stream_is_closed:
1252  * @stream: a #GOutputStream.
1253  * 
1254  * Checks if an output stream has already been closed.
1255  * 
1256  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1257  **/
1258 gboolean
1259 g_output_stream_is_closed (GOutputStream *stream)
1260 {
1261   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1262   
1263   return stream->priv->closed;
1264 }
1265
1266 /**
1267  * g_output_stream_is_closing:
1268  * @stream: a #GOutputStream.
1269  *
1270  * Checks if an output stream is being closed. This can be
1271  * used inside e.g. a flush implementation to see if the
1272  * flush (or other i/o operation) is called from within
1273  * the closing operation.
1274  *
1275  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1276  *
1277  * Since: 2.24
1278  **/
1279 gboolean
1280 g_output_stream_is_closing (GOutputStream *stream)
1281 {
1282   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1283
1284   return stream->priv->closing;
1285 }
1286
1287 /**
1288  * g_output_stream_has_pending:
1289  * @stream: a #GOutputStream.
1290  * 
1291  * Checks if an ouput stream has pending actions.
1292  * 
1293  * Returns: %TRUE if @stream has pending actions. 
1294  **/
1295 gboolean
1296 g_output_stream_has_pending (GOutputStream *stream)
1297 {
1298   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1299   
1300   return stream->priv->pending;
1301 }
1302
1303 /**
1304  * g_output_stream_set_pending:
1305  * @stream: a #GOutputStream.
1306  * @error: a #GError location to store the error occurring, or %NULL to 
1307  * ignore.
1308  * 
1309  * Sets @stream to have actions pending. If the pending flag is
1310  * already set or @stream is closed, it will return %FALSE and set
1311  * @error.
1312  *
1313  * Return value: %TRUE if pending was previously unset and is now set.
1314  **/
1315 gboolean
1316 g_output_stream_set_pending (GOutputStream *stream,
1317                              GError **error)
1318 {
1319   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1320   
1321   if (stream->priv->closed)
1322     {
1323       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1324                            _("Stream is already closed"));
1325       return FALSE;
1326     }
1327   
1328   if (stream->priv->pending)
1329     {
1330       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1331                            /* Translators: This is an error you get if there is
1332                             * already an operation running against this stream when
1333                             * you try to start one */
1334                            _("Stream has outstanding operation"));
1335       return FALSE;
1336     }
1337   
1338   stream->priv->pending = TRUE;
1339   return TRUE;
1340 }
1341
1342 /**
1343  * g_output_stream_clear_pending:
1344  * @stream: output stream
1345  * 
1346  * Clears the pending flag on @stream.
1347  **/
1348 void
1349 g_output_stream_clear_pending (GOutputStream *stream)
1350 {
1351   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1352   
1353   stream->priv->pending = FALSE;
1354 }
1355
1356 /**
1357  * g_output_stream_async_write_is_via_threads:
1358  * @stream: a #GOutputStream.
1359  *
1360  * Checks if an ouput stream's write_async function uses threads.
1361  *
1362  * Returns: %TRUE if @stream's write_async function uses threads.
1363  **/
1364 gboolean
1365 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
1366 {
1367   GOutputStreamClass *class;
1368
1369   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1370
1371   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1372
1373   return (class->write_async == g_output_stream_real_write_async &&
1374       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1375         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
1376 }
1377
1378
1379 /********************************************
1380  *   Default implementation of async ops    *
1381  ********************************************/
1382
1383 typedef struct {
1384   const void         *buffer;
1385   gsize               count_requested;
1386   gssize              count_written;
1387 } WriteData;
1388
1389 static void
1390 free_write_data (WriteData *op)
1391 {
1392   g_slice_free (WriteData, op);
1393 }
1394
1395 static void
1396 write_async_thread (GTask        *task,
1397                     gpointer      source_object,
1398                     gpointer      task_data,
1399                     GCancellable *cancellable)
1400 {
1401   GOutputStream *stream = source_object;
1402   WriteData *op = task_data;
1403   GOutputStreamClass *class;
1404   GError *error = NULL;
1405   gssize count_written;
1406
1407   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1408   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1409                                    cancellable, &error);
1410   if (count_written == -1)
1411     g_task_return_error (task, error);
1412   else
1413     g_task_return_int (task, count_written);
1414 }
1415
1416 static void write_async_pollable (GPollableOutputStream *stream,
1417                                   GTask                 *task);
1418
1419 static gboolean
1420 write_async_pollable_ready (GPollableOutputStream *stream,
1421                             gpointer               user_data)
1422 {
1423   GTask *task = user_data;
1424
1425   write_async_pollable (stream, task);
1426   return FALSE;
1427 }
1428
1429 static void
1430 write_async_pollable (GPollableOutputStream *stream,
1431                       GTask                 *task)
1432 {
1433   GError *error = NULL;
1434   WriteData *op = g_task_get_task_data (task);
1435   gssize count_written;
1436
1437   if (g_task_return_error_if_cancelled (task))
1438     return;
1439
1440   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1441     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1442
1443   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1444     {
1445       GSource *source;
1446
1447       g_error_free (error);
1448
1449       source = g_pollable_output_stream_create_source (stream,
1450                                                        g_task_get_cancellable (task));
1451       g_task_attach_source (task, source,
1452                             (GSourceFunc) write_async_pollable_ready);
1453       g_source_unref (source);
1454       return;
1455     }
1456
1457   if (count_written == -1)
1458     g_task_return_error (task, error);
1459   else
1460     g_task_return_int (task, count_written);
1461 }
1462
1463 static void
1464 g_output_stream_real_write_async (GOutputStream       *stream,
1465                                   const void          *buffer,
1466                                   gsize                count,
1467                                   int                  io_priority,
1468                                   GCancellable        *cancellable,
1469                                   GAsyncReadyCallback  callback,
1470                                   gpointer             user_data)
1471 {
1472   GTask *task;
1473   WriteData *op;
1474
1475   op = g_slice_new0 (WriteData);
1476   task = g_task_new (stream, cancellable, callback, user_data);
1477   g_task_set_check_cancellable (task, FALSE);
1478   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1479   op->buffer = buffer;
1480   op->count_requested = count;
1481
1482   if (!g_output_stream_async_write_is_via_threads (stream))
1483     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1484   else
1485     g_task_run_in_thread (task, write_async_thread);
1486   g_object_unref (task);
1487 }
1488
1489 static gssize
1490 g_output_stream_real_write_finish (GOutputStream  *stream,
1491                                    GAsyncResult   *result,
1492                                    GError        **error)
1493 {
1494   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1495
1496   return g_task_propagate_int (G_TASK (result), error);
1497 }
1498
1499 typedef struct {
1500   GInputStream *source;
1501   GOutputStreamSpliceFlags flags;
1502 } SpliceData;
1503
1504 static void
1505 free_splice_data (SpliceData *op)
1506 {
1507   g_object_unref (op->source);
1508   g_free (op);
1509 }
1510
1511 static void
1512 splice_async_thread (GTask        *task,
1513                      gpointer      source_object,
1514                      gpointer      task_data,
1515                      GCancellable *cancellable)
1516 {
1517   GOutputStream *stream = source_object;
1518   SpliceData *op = task_data;
1519   GOutputStreamClass *class;
1520   GError *error = NULL;
1521   gssize bytes_copied;
1522
1523   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1524   
1525   bytes_copied = class->splice (stream,
1526                                 op->source,
1527                                 op->flags,
1528                                 cancellable,
1529                                 &error);
1530   if (bytes_copied == -1)
1531     g_task_return_error (task, error);
1532   else
1533     g_task_return_int (task, bytes_copied);
1534 }
1535
1536 static void
1537 g_output_stream_real_splice_async (GOutputStream             *stream,
1538                                    GInputStream              *source,
1539                                    GOutputStreamSpliceFlags   flags,
1540                                    int                        io_priority,
1541                                    GCancellable              *cancellable,
1542                                    GAsyncReadyCallback        callback,
1543                                    gpointer                   user_data)
1544 {
1545   GTask *task;
1546   SpliceData *op;
1547
1548   op = g_new0 (SpliceData, 1);
1549   task = g_task_new (stream, cancellable, callback, user_data);
1550   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1551   op->flags = flags;
1552   op->source = g_object_ref (source);
1553
1554   /* TODO: In the case where both source and destintion have
1555      non-threadbased async calls we can use a true async copy here */
1556   
1557   g_task_run_in_thread (task, splice_async_thread);
1558   g_object_unref (task);
1559 }
1560
1561 static gssize
1562 g_output_stream_real_splice_finish (GOutputStream  *stream,
1563                                     GAsyncResult   *result,
1564                                     GError        **error)
1565 {
1566   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1567
1568   return g_task_propagate_int (G_TASK (result), error);
1569 }
1570
1571
1572 static void
1573 flush_async_thread (GTask        *task,
1574                     gpointer      source_object,
1575                     gpointer      task_data,
1576                     GCancellable *cancellable)
1577 {
1578   GOutputStream *stream = source_object;
1579   GOutputStreamClass *class;
1580   gboolean result;
1581   GError *error = NULL;
1582
1583   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1584   result = TRUE;
1585   if (class->flush)
1586     result = class->flush (stream, cancellable, &error);
1587
1588   if (result)
1589     g_task_return_boolean (task, TRUE);
1590   else
1591     g_task_return_error (task, error);
1592 }
1593
1594 static void
1595 g_output_stream_real_flush_async (GOutputStream       *stream,
1596                                   int                  io_priority,
1597                                   GCancellable        *cancellable,
1598                                   GAsyncReadyCallback  callback,
1599                                   gpointer             user_data)
1600 {
1601   GTask *task;
1602
1603   task = g_task_new (stream, cancellable, callback, user_data);
1604   g_task_set_priority (task, io_priority);
1605   g_task_run_in_thread (task, flush_async_thread);
1606   g_object_unref (task);
1607 }
1608
1609 static gboolean
1610 g_output_stream_real_flush_finish (GOutputStream  *stream,
1611                                    GAsyncResult   *result,
1612                                    GError        **error)
1613 {
1614   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1615
1616   return g_task_propagate_boolean (G_TASK (result), error);
1617 }
1618
1619 static void
1620 close_async_thread (GTask        *task,
1621                     gpointer      source_object,
1622                     gpointer      task_data,
1623                     GCancellable *cancellable)
1624 {
1625   GOutputStream *stream = source_object;
1626   GOutputStreamClass *class;
1627   GError *error = NULL;
1628   gboolean result = TRUE;
1629
1630   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1631
1632   /* Do a flush here if there is a flush function, and we did not have to do
1633    * an async flush before (see g_output_stream_close_async)
1634    */
1635   if (class->flush != NULL &&
1636       (class->flush_async == NULL ||
1637        class->flush_async == g_output_stream_real_flush_async))
1638     {
1639       result = class->flush (stream, cancellable, &error);
1640     }
1641
1642   /* Auto handling of cancelation disabled, and ignore
1643      cancellation, since we want to close things anyway, although
1644      possibly in a quick-n-dirty way. At least we never want to leak
1645      open handles */
1646
1647   if (class->close_fn)
1648     {
1649       /* Make sure to close, even if the flush failed (see sync close) */
1650       if (!result)
1651         class->close_fn (stream, cancellable, NULL);
1652       else
1653         result = class->close_fn (stream, cancellable, &error);
1654     }
1655
1656   if (result)
1657     g_task_return_boolean (task, TRUE);
1658   else
1659     g_task_return_error (task, error);
1660 }
1661
1662 static void
1663 g_output_stream_real_close_async (GOutputStream       *stream,
1664                                   int                  io_priority,
1665                                   GCancellable        *cancellable,
1666                                   GAsyncReadyCallback  callback,
1667                                   gpointer             user_data)
1668 {
1669   GTask *task;
1670
1671   task = g_task_new (stream, cancellable, callback, user_data);
1672   g_task_set_priority (task, io_priority);
1673   g_task_run_in_thread (task, close_async_thread);
1674   g_object_unref (task);
1675 }
1676
1677 static gboolean
1678 g_output_stream_real_close_finish (GOutputStream  *stream,
1679                                    GAsyncResult   *result,
1680                                    GError        **error)
1681 {
1682   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1683
1684   return g_task_propagate_boolean (G_TASK (result), error);
1685 }