gio: Use the new private instance data declaration
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gtask.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 struct _GOutputStreamPrivate {
49   guint closed : 1;
50   guint pending : 1;
51   guint closing : 1;
52   GAsyncReadyCallback outstanding_callback;
53 };
54
55 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101
102 static void
103 g_output_stream_dispose (GObject *object)
104 {
105   GOutputStream *stream;
106
107   stream = G_OUTPUT_STREAM (object);
108   
109   if (!stream->priv->closed)
110     g_output_stream_close (stream, NULL, NULL);
111
112   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
113 }
114
115 static void
116 g_output_stream_class_init (GOutputStreamClass *klass)
117 {
118   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
119
120   gobject_class->dispose = g_output_stream_dispose;
121
122   klass->splice = g_output_stream_real_splice;
123   
124   klass->write_async = g_output_stream_real_write_async;
125   klass->write_finish = g_output_stream_real_write_finish;
126   klass->splice_async = g_output_stream_real_splice_async;
127   klass->splice_finish = g_output_stream_real_splice_finish;
128   klass->flush_async = g_output_stream_real_flush_async;
129   klass->flush_finish = g_output_stream_real_flush_finish;
130   klass->close_async = g_output_stream_real_close_async;
131   klass->close_finish = g_output_stream_real_close_finish;
132 }
133
134 static void
135 g_output_stream_init (GOutputStream *stream)
136 {
137   stream->priv = g_output_stream_get_private (stream);
138 }
139
140 /**
141  * g_output_stream_write:
142  * @stream: a #GOutputStream.
143  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
144  * @count: the number of bytes to write
145  * @cancellable: (allow-none): optional cancellable object
146  * @error: location to store the error occurring, or %NULL to ignore
147  *
148  * Tries to write @count bytes from @buffer into the stream. Will block
149  * during the operation.
150  * 
151  * If count is 0, returns 0 and does nothing. A value of @count
152  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
153  *
154  * On success, the number of bytes written to the stream is returned.
155  * It is not an error if this is not the same as the requested size, as it
156  * can happen e.g. on a partial I/O error, or if there is not enough
157  * storage in the stream. All writes block until at least one byte
158  * is written or an error occurs; 0 is never returned (unless
159  * @count is 0).
160  * 
161  * If @cancellable is not %NULL, then the operation can be cancelled by
162  * triggering the cancellable object from another thread. If the operation
163  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
164  * operation was partially finished when the operation was cancelled the
165  * partial result will be returned, without an error.
166  *
167  * On error -1 is returned and @error is set accordingly.
168  * 
169  * Virtual: write_fn
170  *
171  * Return value: Number of bytes written, or -1 on error
172  **/
173 gssize
174 g_output_stream_write (GOutputStream  *stream,
175                        const void     *buffer,
176                        gsize           count,
177                        GCancellable   *cancellable,
178                        GError        **error)
179 {
180   GOutputStreamClass *class;
181   gssize res;
182
183   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
184   g_return_val_if_fail (buffer != NULL, 0);
185
186   if (count == 0)
187     return 0;
188   
189   if (((gssize) count) < 0)
190     {
191       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
192                    _("Too large count value passed to %s"), G_STRFUNC);
193       return -1;
194     }
195
196   class = G_OUTPUT_STREAM_GET_CLASS (stream);
197
198   if (class->write_fn == NULL) 
199     {
200       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
201                            _("Output stream doesn't implement write"));
202       return -1;
203     }
204   
205   if (!g_output_stream_set_pending (stream, error))
206     return -1;
207   
208   if (cancellable)
209     g_cancellable_push_current (cancellable);
210   
211   res = class->write_fn (stream, buffer, count, cancellable, error);
212   
213   if (cancellable)
214     g_cancellable_pop_current (cancellable);
215   
216   g_output_stream_clear_pending (stream);
217
218   return res; 
219 }
220
221 /**
222  * g_output_stream_write_all:
223  * @stream: a #GOutputStream.
224  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
225  * @count: the number of bytes to write
226  * @bytes_written: (out): location to store the number of bytes that was 
227  *     written to the stream
228  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
229  * @error: location to store the error occurring, or %NULL to ignore
230  *
231  * Tries to write @count bytes from @buffer into the stream. Will block
232  * during the operation.
233  * 
234  * This function is similar to g_output_stream_write(), except it tries to
235  * write as many bytes as requested, only stopping on an error.
236  *
237  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
238  * is set to @count.
239  * 
240  * If there is an error during the operation %FALSE is returned and @error
241  * is set to indicate the error status, @bytes_written is updated to contain
242  * the number of bytes written into the stream before the error occurred.
243  *
244  * Return value: %TRUE on success, %FALSE if there was an error
245  **/
246 gboolean
247 g_output_stream_write_all (GOutputStream  *stream,
248                            const void     *buffer,
249                            gsize           count,
250                            gsize          *bytes_written,
251                            GCancellable   *cancellable,
252                            GError        **error)
253 {
254   gsize _bytes_written;
255   gssize res;
256
257   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
258   g_return_val_if_fail (buffer != NULL, FALSE);
259
260   _bytes_written = 0;
261   while (_bytes_written < count)
262     {
263       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
264                                    cancellable, error);
265       if (res == -1)
266         {
267           if (bytes_written)
268             *bytes_written = _bytes_written;
269           return FALSE;
270         }
271       
272       if (res == 0)
273         g_warning ("Write returned zero without error");
274
275       _bytes_written += res;
276     }
277   
278   if (bytes_written)
279     *bytes_written = _bytes_written;
280
281   return TRUE;
282 }
283
284 /**
285  * g_output_stream_write_bytes:
286  * @stream: a #GOutputStream.
287  * @bytes: the #GBytes to write
288  * @cancellable: (allow-none): optional cancellable object
289  * @error: location to store the error occurring, or %NULL to ignore
290  *
291  * Tries to write the data from @bytes into the stream. Will block
292  * during the operation.
293  *
294  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
295  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
296  *
297  * On success, the number of bytes written to the stream is returned.
298  * It is not an error if this is not the same as the requested size, as it
299  * can happen e.g. on a partial I/O error, or if there is not enough
300  * storage in the stream. All writes block until at least one byte
301  * is written or an error occurs; 0 is never returned (unless
302  * the size of @bytes is 0).
303  *
304  * If @cancellable is not %NULL, then the operation can be cancelled by
305  * triggering the cancellable object from another thread. If the operation
306  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
307  * operation was partially finished when the operation was cancelled the
308  * partial result will be returned, without an error.
309  *
310  * On error -1 is returned and @error is set accordingly.
311  *
312  * Return value: Number of bytes written, or -1 on error
313  **/
314 gssize
315 g_output_stream_write_bytes (GOutputStream  *stream,
316                              GBytes         *bytes,
317                              GCancellable   *cancellable,
318                              GError        **error)
319 {
320   gsize size;
321   gconstpointer data;
322
323   data = g_bytes_get_data (bytes, &size);
324
325   return g_output_stream_write (stream,
326                                 data, size,
327                                 cancellable,
328                                 error);
329 }
330
331 /**
332  * g_output_stream_flush:
333  * @stream: a #GOutputStream.
334  * @cancellable: (allow-none): optional cancellable object
335  * @error: location to store the error occurring, or %NULL to ignore
336  *
337  * Forces a write of all user-space buffered data for the given
338  * @stream. Will block during the operation. Closing the stream will
339  * implicitly cause a flush.
340  *
341  * This function is optional for inherited classes.
342  * 
343  * If @cancellable is not %NULL, then the operation can be cancelled by
344  * triggering the cancellable object from another thread. If the operation
345  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
346  *
347  * Return value: %TRUE on success, %FALSE on error
348  **/
349 gboolean
350 g_output_stream_flush (GOutputStream  *stream,
351                        GCancellable   *cancellable,
352                        GError        **error)
353 {
354   GOutputStreamClass *class;
355   gboolean res;
356
357   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
358
359   if (!g_output_stream_set_pending (stream, error))
360     return FALSE;
361   
362   class = G_OUTPUT_STREAM_GET_CLASS (stream);
363
364   res = TRUE;
365   if (class->flush)
366     {
367       if (cancellable)
368         g_cancellable_push_current (cancellable);
369       
370       res = class->flush (stream, cancellable, error);
371       
372       if (cancellable)
373         g_cancellable_pop_current (cancellable);
374     }
375   
376   g_output_stream_clear_pending (stream);
377
378   return res;
379 }
380
381 /**
382  * g_output_stream_splice:
383  * @stream: a #GOutputStream.
384  * @source: a #GInputStream.
385  * @flags: a set of #GOutputStreamSpliceFlags.
386  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
387  * @error: a #GError location to store the error occurring, or %NULL to
388  * ignore.
389  *
390  * Splices an input stream into an output stream.
391  *
392  * Returns: a #gssize containing the size of the data spliced, or
393  *     -1 if an error occurred. Note that if the number of bytes
394  *     spliced is greater than %G_MAXSSIZE, then that will be
395  *     returned, and there is no way to determine the actual number
396  *     of bytes spliced.
397  **/
398 gssize
399 g_output_stream_splice (GOutputStream             *stream,
400                         GInputStream              *source,
401                         GOutputStreamSpliceFlags   flags,
402                         GCancellable              *cancellable,
403                         GError                   **error)
404 {
405   GOutputStreamClass *class;
406   gssize bytes_copied;
407
408   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
409   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
410
411   if (g_input_stream_is_closed (source))
412     {
413       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
414                            _("Source stream is already closed"));
415       return -1;
416     }
417
418   if (!g_output_stream_set_pending (stream, error))
419     return -1;
420
421   class = G_OUTPUT_STREAM_GET_CLASS (stream);
422
423   if (cancellable)
424     g_cancellable_push_current (cancellable);
425
426   bytes_copied = class->splice (stream, source, flags, cancellable, error);
427
428   if (cancellable)
429     g_cancellable_pop_current (cancellable);
430
431   g_output_stream_clear_pending (stream);
432
433   return bytes_copied;
434 }
435
436 static gssize
437 g_output_stream_real_splice (GOutputStream             *stream,
438                              GInputStream              *source,
439                              GOutputStreamSpliceFlags   flags,
440                              GCancellable              *cancellable,
441                              GError                   **error)
442 {
443   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
444   gssize n_read, n_written;
445   gsize bytes_copied;
446   char buffer[8192], *p;
447   gboolean res;
448
449   bytes_copied = 0;
450   if (class->write_fn == NULL)
451     {
452       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
453                            _("Output stream doesn't implement write"));
454       res = FALSE;
455       goto notsupported;
456     }
457
458   res = TRUE;
459   do
460     {
461       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
462       if (n_read == -1)
463         {
464           res = FALSE;
465           break;
466         }
467
468       if (n_read == 0)
469         break;
470
471       p = buffer;
472       while (n_read > 0)
473         {
474           n_written = class->write_fn (stream, p, n_read, cancellable, error);
475           if (n_written == -1)
476             {
477               res = FALSE;
478               break;
479             }
480
481           p += n_written;
482           n_read -= n_written;
483           bytes_copied += n_written;
484         }
485
486       if (bytes_copied > G_MAXSSIZE)
487         bytes_copied = G_MAXSSIZE;
488     }
489   while (res);
490
491  notsupported:
492   if (!res)
493     error = NULL; /* Ignore further errors */
494
495   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
496     {
497       /* Don't care about errors in source here */
498       g_input_stream_close (source, cancellable, NULL);
499     }
500
501   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
502     {
503       /* But write errors on close are bad! */
504       res = _g_output_stream_close_internal (stream, cancellable, error);
505     }
506
507   if (res)
508     return bytes_copied;
509
510   return -1;
511 }
512
513 /* Must always be called inside
514  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
515 static gboolean
516 _g_output_stream_close_internal (GOutputStream  *stream,
517                                  GCancellable   *cancellable,
518                                  GError        **error)
519 {
520   GOutputStreamClass *class;
521   gboolean res;
522
523   if (stream->priv->closed)
524     return TRUE;
525
526   class = G_OUTPUT_STREAM_GET_CLASS (stream);
527
528   stream->priv->closing = TRUE;
529
530   if (cancellable)
531     g_cancellable_push_current (cancellable);
532
533   if (class->flush)
534     res = class->flush (stream, cancellable, error);
535   else
536     res = TRUE;
537
538   if (!res)
539     {
540       /* flushing caused the error that we want to return,
541        * but we still want to close the underlying stream if possible
542        */
543       if (class->close_fn)
544         class->close_fn (stream, cancellable, NULL);
545     }
546   else
547     {
548       res = TRUE;
549       if (class->close_fn)
550         res = class->close_fn (stream, cancellable, error);
551     }
552
553   if (cancellable)
554     g_cancellable_pop_current (cancellable);
555
556   stream->priv->closing = FALSE;
557   stream->priv->closed = TRUE;
558
559   return res;
560 }
561
562 /**
563  * g_output_stream_close:
564  * @stream: A #GOutputStream.
565  * @cancellable: (allow-none): optional cancellable object
566  * @error: location to store the error occurring, or %NULL to ignore
567  *
568  * Closes the stream, releasing resources related to it.
569  *
570  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
571  * Closing a stream multiple times will not return an error.
572  *
573  * Closing a stream will automatically flush any outstanding buffers in the
574  * stream.
575  *
576  * Streams will be automatically closed when the last reference
577  * is dropped, but you might want to call this function to make sure 
578  * resources are released as early as possible.
579  *
580  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
581  * open after the stream is closed. See the documentation for the individual
582  * stream for details.
583  *
584  * On failure the first error that happened will be reported, but the close
585  * operation will finish as much as possible. A stream that failed to
586  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
587  * is important to check and report the error to the user, otherwise
588  * there might be a loss of data as all data might not be written.
589  * 
590  * If @cancellable is not %NULL, then the operation can be cancelled by
591  * triggering the cancellable object from another thread. If the operation
592  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
593  * Cancelling a close will still leave the stream closed, but there some streams
594  * can use a faster close that doesn't block to e.g. check errors. On
595  * cancellation (as with any error) there is no guarantee that all written
596  * data will reach the target. 
597  *
598  * Return value: %TRUE on success, %FALSE on failure
599  **/
600 gboolean
601 g_output_stream_close (GOutputStream  *stream,
602                        GCancellable   *cancellable,
603                        GError        **error)
604 {
605   gboolean res;
606
607   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
608
609   if (stream->priv->closed)
610     return TRUE;
611
612   if (!g_output_stream_set_pending (stream, error))
613     return FALSE;
614
615   res = _g_output_stream_close_internal (stream, cancellable, error);
616
617   g_output_stream_clear_pending (stream);
618   
619   return res;
620 }
621
622 static void
623 async_ready_write_callback_wrapper (GObject      *source_object,
624                                     GAsyncResult *res,
625                                     gpointer      user_data)
626 {
627   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
628   GOutputStreamClass *class;
629   GTask *task = user_data;
630   gssize nwrote;
631   GError *error = NULL;
632
633   g_output_stream_clear_pending (stream);
634   
635   if (g_async_result_legacy_propagate_error (res, &error))
636     nwrote = -1;
637   else
638     {
639       class = G_OUTPUT_STREAM_GET_CLASS (stream);
640       nwrote = class->write_finish (stream, res, &error);
641     }
642
643   if (nwrote >= 0)
644     g_task_return_int (task, nwrote);
645   else
646     g_task_return_error (task, error);
647   g_object_unref (task);
648 }
649
650 /**
651  * g_output_stream_write_async:
652  * @stream: A #GOutputStream.
653  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
654  * @count: the number of bytes to write
655  * @io_priority: the io priority of the request.
656  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
657  * @callback: (scope async): callback to call when the request is satisfied
658  * @user_data: (closure): the data to pass to callback function
659  *
660  * Request an asynchronous write of @count bytes from @buffer into 
661  * the stream. When the operation is finished @callback will be called.
662  * You can then call g_output_stream_write_finish() to get the result of the 
663  * operation.
664  *
665  * During an async request no other sync and async calls are allowed, 
666  * and will result in %G_IO_ERROR_PENDING errors. 
667  *
668  * A value of @count larger than %G_MAXSSIZE will cause a 
669  * %G_IO_ERROR_INVALID_ARGUMENT error.
670  *
671  * On success, the number of bytes written will be passed to the
672  * @callback. It is not an error if this is not the same as the 
673  * requested size, as it can happen e.g. on a partial I/O error, 
674  * but generally we try to write as many bytes as requested. 
675  *
676  * You are guaranteed that this method will never fail with
677  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
678  * method will just wait until this changes.
679  *
680  * Any outstanding I/O request with higher priority (lower numerical 
681  * value) will be executed before an outstanding request with lower 
682  * priority. Default priority is %G_PRIORITY_DEFAULT.
683  *
684  * The asyncronous methods have a default fallback that uses threads 
685  * to implement asynchronicity, so they are optional for inheriting 
686  * classes. However, if you override one you must override all.
687  *
688  * For the synchronous, blocking version of this function, see 
689  * g_output_stream_write().
690  **/
691 void
692 g_output_stream_write_async (GOutputStream       *stream,
693                              const void          *buffer,
694                              gsize                count,
695                              int                  io_priority,
696                              GCancellable        *cancellable,
697                              GAsyncReadyCallback  callback,
698                              gpointer             user_data)
699 {
700   GOutputStreamClass *class;
701   GError *error = NULL;
702   GTask *task;
703
704   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
705   g_return_if_fail (buffer != NULL);
706
707   task = g_task_new (stream, cancellable, callback, user_data);
708   g_task_set_source_tag (task, g_output_stream_write_async);
709   g_task_set_priority (task, io_priority);
710
711   if (count == 0)
712     {
713       g_task_return_int (task, 0);
714       g_object_unref (task);
715       return;
716     }
717
718   if (((gssize) count) < 0)
719     {
720       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
721                                _("Too large count value passed to %s"),
722                                G_STRFUNC);
723       g_object_unref (task);
724       return;
725     }
726
727   if (!g_output_stream_set_pending (stream, &error))
728     {
729       g_task_return_error (task, error);
730       g_object_unref (task);
731       return;
732     }
733   
734   class = G_OUTPUT_STREAM_GET_CLASS (stream);
735
736   class->write_async (stream, buffer, count, io_priority, cancellable,
737                       async_ready_write_callback_wrapper, task);
738 }
739
740 /**
741  * g_output_stream_write_finish:
742  * @stream: a #GOutputStream.
743  * @result: a #GAsyncResult.
744  * @error: a #GError location to store the error occurring, or %NULL to 
745  * ignore.
746  * 
747  * Finishes a stream write operation.
748  * 
749  * Returns: a #gssize containing the number of bytes written to the stream.
750  **/
751 gssize
752 g_output_stream_write_finish (GOutputStream  *stream,
753                               GAsyncResult   *result,
754                               GError        **error)
755 {
756   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
757   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
758   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
759
760   /* @result is always the GTask created by g_output_stream_write_async();
761    * we called class->write_finish() from async_ready_write_callback_wrapper.
762    */
763   return g_task_propagate_int (G_TASK (result), error);
764 }
765
766 static void
767 write_bytes_callback (GObject      *stream,
768                       GAsyncResult *result,
769                       gpointer      user_data)
770 {
771   GTask *task = user_data;
772   GError *error = NULL;
773   gssize nwrote;
774
775   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
776                                          result, &error);
777   if (nwrote == -1)
778     g_task_return_error (task, error);
779   else
780     g_task_return_int (task, nwrote);
781   g_object_unref (task);
782 }
783
784 /**
785  * g_output_stream_write_bytes_async:
786  * @stream: A #GOutputStream.
787  * @bytes: The bytes to write
788  * @io_priority: the io priority of the request.
789  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
790  * @callback: (scope async): callback to call when the request is satisfied
791  * @user_data: (closure): the data to pass to callback function
792  *
793  * Request an asynchronous write of the data in @bytes to the stream.
794  * When the operation is finished @callback will be called. You can
795  * then call g_output_stream_write_bytes_finish() to get the result of
796  * the operation.
797  *
798  * During an async request no other sync and async calls are allowed,
799  * and will result in %G_IO_ERROR_PENDING errors.
800  *
801  * A #GBytes larger than %G_MAXSSIZE will cause a
802  * %G_IO_ERROR_INVALID_ARGUMENT error.
803  *
804  * On success, the number of bytes written will be passed to the
805  * @callback. It is not an error if this is not the same as the
806  * requested size, as it can happen e.g. on a partial I/O error,
807  * but generally we try to write as many bytes as requested.
808  *
809  * You are guaranteed that this method will never fail with
810  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
811  * method will just wait until this changes.
812  *
813  * Any outstanding I/O request with higher priority (lower numerical
814  * value) will be executed before an outstanding request with lower
815  * priority. Default priority is %G_PRIORITY_DEFAULT.
816  *
817  * For the synchronous, blocking version of this function, see
818  * g_output_stream_write_bytes().
819  **/
820 void
821 g_output_stream_write_bytes_async (GOutputStream       *stream,
822                                    GBytes              *bytes,
823                                    int                  io_priority,
824                                    GCancellable        *cancellable,
825                                    GAsyncReadyCallback  callback,
826                                    gpointer             user_data)
827 {
828   GTask *task;
829   gsize size;
830   gconstpointer data;
831
832   data = g_bytes_get_data (bytes, &size);
833
834   task = g_task_new (stream, cancellable, callback, user_data);
835   g_task_set_task_data (task, g_bytes_ref (bytes),
836                         (GDestroyNotify) g_bytes_unref);
837
838   g_output_stream_write_async (stream,
839                                data, size,
840                                io_priority,
841                                cancellable,
842                                write_bytes_callback,
843                                task);
844 }
845
846 /**
847  * g_output_stream_write_bytes_finish:
848  * @stream: a #GOutputStream.
849  * @result: a #GAsyncResult.
850  * @error: a #GError location to store the error occurring, or %NULL to
851  * ignore.
852  *
853  * Finishes a stream write-from-#GBytes operation.
854  *
855  * Returns: a #gssize containing the number of bytes written to the stream.
856  **/
857 gssize
858 g_output_stream_write_bytes_finish (GOutputStream  *stream,
859                                     GAsyncResult   *result,
860                                     GError        **error)
861 {
862   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
863   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
864
865   return g_task_propagate_int (G_TASK (result), error);
866 }
867
868 static void
869 async_ready_splice_callback_wrapper (GObject      *source_object,
870                                      GAsyncResult *res,
871                                      gpointer     _data)
872 {
873   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
874   GOutputStreamClass *class;
875   GTask *task = _data;
876   gssize nspliced;
877   GError *error = NULL;
878
879   g_output_stream_clear_pending (stream);
880   
881   if (g_async_result_legacy_propagate_error (res, &error))
882     nspliced = -1;
883   else
884     {
885       class = G_OUTPUT_STREAM_GET_CLASS (stream);
886       nspliced = class->splice_finish (stream, res, &error);
887     }
888
889   if (nspliced >= 0)
890     g_task_return_int (task, nspliced);
891   else
892     g_task_return_error (task, error);
893   g_object_unref (task);
894 }
895
896 /**
897  * g_output_stream_splice_async:
898  * @stream: a #GOutputStream.
899  * @source: a #GInputStream. 
900  * @flags: a set of #GOutputStreamSpliceFlags.
901  * @io_priority: the io priority of the request.
902  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
903  * @callback: (scope async): a #GAsyncReadyCallback. 
904  * @user_data: (closure): user data passed to @callback.
905  * 
906  * Splices a stream asynchronously.
907  * When the operation is finished @callback will be called.
908  * You can then call g_output_stream_splice_finish() to get the 
909  * result of the operation.
910  *
911  * For the synchronous, blocking version of this function, see 
912  * g_output_stream_splice().
913  **/
914 void
915 g_output_stream_splice_async (GOutputStream            *stream,
916                               GInputStream             *source,
917                               GOutputStreamSpliceFlags  flags,
918                               int                       io_priority,
919                               GCancellable             *cancellable,
920                               GAsyncReadyCallback       callback,
921                               gpointer                  user_data)
922 {
923   GOutputStreamClass *class;
924   GTask *task;
925   GError *error = NULL;
926
927   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
928   g_return_if_fail (G_IS_INPUT_STREAM (source));
929
930   task = g_task_new (stream, cancellable, callback, user_data);
931   g_task_set_source_tag (task, g_output_stream_splice_async);
932   g_task_set_priority (task, io_priority);
933   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
934
935   if (g_input_stream_is_closed (source))
936     {
937       g_task_return_new_error (task,
938                                G_IO_ERROR, G_IO_ERROR_CLOSED,
939                                _("Source stream is already closed"));
940       g_object_unref (task);
941       return;
942     }
943   
944   if (!g_output_stream_set_pending (stream, &error))
945     {
946       g_task_return_error (task, error);
947       g_object_unref (task);
948       return;
949     }
950
951   class = G_OUTPUT_STREAM_GET_CLASS (stream);
952
953   class->splice_async (stream, source, flags, io_priority, cancellable,
954                        async_ready_splice_callback_wrapper, task);
955 }
956
957 /**
958  * g_output_stream_splice_finish:
959  * @stream: a #GOutputStream.
960  * @result: a #GAsyncResult.
961  * @error: a #GError location to store the error occurring, or %NULL to 
962  * ignore.
963  *
964  * Finishes an asynchronous stream splice operation.
965  * 
966  * Returns: a #gssize of the number of bytes spliced. Note that if the
967  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
968  *     will be returned, and there is no way to determine the actual
969  *     number of bytes spliced.
970  **/
971 gssize
972 g_output_stream_splice_finish (GOutputStream  *stream,
973                                GAsyncResult   *result,
974                                GError        **error)
975 {
976   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
977   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
978   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
979
980   /* @result is always the GTask created by g_output_stream_splice_async();
981    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
982    */
983   return g_task_propagate_int (G_TASK (result), error);
984 }
985
986 static void
987 async_ready_flush_callback_wrapper (GObject      *source_object,
988                                     GAsyncResult *res,
989                                     gpointer      user_data)
990 {
991   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
992   GOutputStreamClass *class;
993   GTask *task = user_data;
994   gboolean flushed;
995   GError *error = NULL;
996
997   g_output_stream_clear_pending (stream);
998   
999   if (g_async_result_legacy_propagate_error (res, &error))
1000     flushed = FALSE;
1001   else
1002     {
1003       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1004       flushed = class->flush_finish (stream, res, &error);
1005     }
1006
1007   if (flushed)
1008     g_task_return_boolean (task, TRUE);
1009   else
1010     g_task_return_error (task, error);
1011   g_object_unref (task);
1012 }
1013
1014 /**
1015  * g_output_stream_flush_async:
1016  * @stream: a #GOutputStream.
1017  * @io_priority: the io priority of the request.
1018  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1019  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1020  * @user_data: (closure): the data to pass to callback function
1021  * 
1022  * Forces an asynchronous write of all user-space buffered data for
1023  * the given @stream.
1024  * For behaviour details see g_output_stream_flush().
1025  *
1026  * When the operation is finished @callback will be 
1027  * called. You can then call g_output_stream_flush_finish() to get the 
1028  * result of the operation.
1029  **/
1030 void
1031 g_output_stream_flush_async (GOutputStream       *stream,
1032                              int                  io_priority,
1033                              GCancellable        *cancellable,
1034                              GAsyncReadyCallback  callback,
1035                              gpointer             user_data)
1036 {
1037   GOutputStreamClass *class;
1038   GTask *task;
1039   GError *error = NULL;
1040
1041   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1042
1043   task = g_task_new (stream, cancellable, callback, user_data);
1044   g_task_set_source_tag (task, g_output_stream_flush_async);
1045   g_task_set_priority (task, io_priority);
1046
1047   if (!g_output_stream_set_pending (stream, &error))
1048     {
1049       g_task_return_error (task, error);
1050       g_object_unref (task);
1051       return;
1052     }
1053
1054   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1055   
1056   if (class->flush_async == NULL)
1057     {
1058       g_task_return_boolean (task, TRUE);
1059       g_object_unref (task);
1060       return;
1061     }
1062       
1063   class->flush_async (stream, io_priority, cancellable,
1064                       async_ready_flush_callback_wrapper, task);
1065 }
1066
1067 /**
1068  * g_output_stream_flush_finish:
1069  * @stream: a #GOutputStream.
1070  * @result: a GAsyncResult.
1071  * @error: a #GError location to store the error occurring, or %NULL to 
1072  * ignore.
1073  * 
1074  * Finishes flushing an output stream.
1075  * 
1076  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1077  **/
1078 gboolean
1079 g_output_stream_flush_finish (GOutputStream  *stream,
1080                               GAsyncResult   *result,
1081                               GError        **error)
1082 {
1083   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1084   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1085   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1086
1087   /* @result is always the GTask created by g_output_stream_flush_async();
1088    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1089    */
1090   return g_task_propagate_boolean (G_TASK (result), error);
1091 }
1092
1093
1094 static void
1095 async_ready_close_callback_wrapper (GObject      *source_object,
1096                                     GAsyncResult *res,
1097                                     gpointer      user_data)
1098 {
1099   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1100   GOutputStreamClass *class;
1101   GTask *task = user_data;
1102   GError *error = g_task_get_task_data (task);
1103
1104   stream->priv->closing = FALSE;
1105   stream->priv->closed = TRUE;
1106
1107   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1108     {
1109       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1110
1111       class->close_finish (stream, res,
1112                            error ? NULL : &error);
1113     }
1114
1115   g_output_stream_clear_pending (stream);
1116
1117   if (error != NULL)
1118     g_task_return_error (task, error);
1119   else
1120     g_task_return_boolean (task, TRUE);
1121   g_object_unref (task);
1122 }
1123
1124 static void
1125 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1126                                             GAsyncResult *res,
1127                                             gpointer      user_data)
1128 {
1129   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1130   GOutputStreamClass *class;
1131   GTask *task = user_data;
1132   GError *error = NULL;
1133
1134   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1135
1136   if (!g_async_result_legacy_propagate_error (res, &error))
1137     {
1138       class->flush_finish (stream, res, &error);
1139     }
1140
1141   /* propagate the possible error */
1142   if (error)
1143     g_task_set_task_data (task, error, NULL);
1144
1145   /* we still close, even if there was a flush error */
1146   class->close_async (stream,
1147                       g_task_get_priority (task),
1148                       g_task_get_cancellable (task),
1149                       async_ready_close_callback_wrapper, task);
1150 }
1151
1152 /**
1153  * g_output_stream_close_async:
1154  * @stream: A #GOutputStream.
1155  * @io_priority: the io priority of the request.
1156  * @cancellable: (allow-none): optional cancellable object
1157  * @callback: (scope async): callback to call when the request is satisfied
1158  * @user_data: (closure): the data to pass to callback function
1159  *
1160  * Requests an asynchronous close of the stream, releasing resources 
1161  * related to it. When the operation is finished @callback will be 
1162  * called. You can then call g_output_stream_close_finish() to get 
1163  * the result of the operation.
1164  *
1165  * For behaviour details see g_output_stream_close().
1166  *
1167  * The asyncronous methods have a default fallback that uses threads 
1168  * to implement asynchronicity, so they are optional for inheriting 
1169  * classes. However, if you override one you must override all.
1170  **/
1171 void
1172 g_output_stream_close_async (GOutputStream       *stream,
1173                              int                  io_priority,
1174                              GCancellable        *cancellable,
1175                              GAsyncReadyCallback  callback,
1176                              gpointer             user_data)
1177 {
1178   GOutputStreamClass *class;
1179   GTask *task;
1180   GError *error = NULL;
1181
1182   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1183   
1184   task = g_task_new (stream, cancellable, callback, user_data);
1185   g_task_set_source_tag (task, g_output_stream_close_async);
1186   g_task_set_priority (task, io_priority);
1187
1188   if (stream->priv->closed)
1189     {
1190       g_task_return_boolean (task, TRUE);
1191       g_object_unref (task);
1192       return;
1193     }
1194
1195   if (!g_output_stream_set_pending (stream, &error))
1196     {
1197       g_task_return_error (task, error);
1198       g_object_unref (task);
1199       return;
1200     }
1201   
1202   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1203   stream->priv->closing = TRUE;
1204
1205   /* Call close_async directly if there is no need to flush, or if the flush
1206      can be done sync (in the output stream async close thread) */
1207   if (class->flush_async == NULL ||
1208       (class->flush_async == g_output_stream_real_flush_async &&
1209        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1210     {
1211       class->close_async (stream, io_priority, cancellable,
1212                           async_ready_close_callback_wrapper, task);
1213     }
1214   else
1215     {
1216       /* First do an async flush, then do the async close in the callback
1217          wrapper (see async_ready_close_flushed_callback_wrapper) */
1218       class->flush_async (stream, io_priority, cancellable,
1219                           async_ready_close_flushed_callback_wrapper, task);
1220     }
1221 }
1222
1223 /**
1224  * g_output_stream_close_finish:
1225  * @stream: a #GOutputStream.
1226  * @result: a #GAsyncResult.
1227  * @error: a #GError location to store the error occurring, or %NULL to 
1228  * ignore.
1229  * 
1230  * Closes an output stream.
1231  * 
1232  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1233  **/
1234 gboolean
1235 g_output_stream_close_finish (GOutputStream  *stream,
1236                               GAsyncResult   *result,
1237                               GError        **error)
1238 {
1239   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1240   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1241   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1242
1243   /* @result is always the GTask created by g_output_stream_close_async();
1244    * we called class->close_finish() from async_ready_close_callback_wrapper.
1245    */
1246   return g_task_propagate_boolean (G_TASK (result), error);
1247 }
1248
1249 /**
1250  * g_output_stream_is_closed:
1251  * @stream: a #GOutputStream.
1252  * 
1253  * Checks if an output stream has already been closed.
1254  * 
1255  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1256  **/
1257 gboolean
1258 g_output_stream_is_closed (GOutputStream *stream)
1259 {
1260   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1261   
1262   return stream->priv->closed;
1263 }
1264
1265 /**
1266  * g_output_stream_is_closing:
1267  * @stream: a #GOutputStream.
1268  *
1269  * Checks if an output stream is being closed. This can be
1270  * used inside e.g. a flush implementation to see if the
1271  * flush (or other i/o operation) is called from within
1272  * the closing operation.
1273  *
1274  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1275  *
1276  * Since: 2.24
1277  **/
1278 gboolean
1279 g_output_stream_is_closing (GOutputStream *stream)
1280 {
1281   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1282
1283   return stream->priv->closing;
1284 }
1285
1286 /**
1287  * g_output_stream_has_pending:
1288  * @stream: a #GOutputStream.
1289  * 
1290  * Checks if an ouput stream has pending actions.
1291  * 
1292  * Returns: %TRUE if @stream has pending actions. 
1293  **/
1294 gboolean
1295 g_output_stream_has_pending (GOutputStream *stream)
1296 {
1297   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1298   
1299   return stream->priv->pending;
1300 }
1301
1302 /**
1303  * g_output_stream_set_pending:
1304  * @stream: a #GOutputStream.
1305  * @error: a #GError location to store the error occurring, or %NULL to 
1306  * ignore.
1307  * 
1308  * Sets @stream to have actions pending. If the pending flag is
1309  * already set or @stream is closed, it will return %FALSE and set
1310  * @error.
1311  *
1312  * Return value: %TRUE if pending was previously unset and is now set.
1313  **/
1314 gboolean
1315 g_output_stream_set_pending (GOutputStream *stream,
1316                              GError **error)
1317 {
1318   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1319   
1320   if (stream->priv->closed)
1321     {
1322       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1323                            _("Stream is already closed"));
1324       return FALSE;
1325     }
1326   
1327   if (stream->priv->pending)
1328     {
1329       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1330                            /* Translators: This is an error you get if there is
1331                             * already an operation running against this stream when
1332                             * you try to start one */
1333                            _("Stream has outstanding operation"));
1334       return FALSE;
1335     }
1336   
1337   stream->priv->pending = TRUE;
1338   return TRUE;
1339 }
1340
1341 /**
1342  * g_output_stream_clear_pending:
1343  * @stream: output stream
1344  * 
1345  * Clears the pending flag on @stream.
1346  **/
1347 void
1348 g_output_stream_clear_pending (GOutputStream *stream)
1349 {
1350   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1351   
1352   stream->priv->pending = FALSE;
1353 }
1354
1355
1356 /********************************************
1357  *   Default implementation of async ops    *
1358  ********************************************/
1359
1360 typedef struct {
1361   const void         *buffer;
1362   gsize               count_requested;
1363   gssize              count_written;
1364 } WriteData;
1365
1366 static void
1367 free_write_data (WriteData *op)
1368 {
1369   g_slice_free (WriteData, op);
1370 }
1371
1372 static void
1373 write_async_thread (GTask        *task,
1374                     gpointer      source_object,
1375                     gpointer      task_data,
1376                     GCancellable *cancellable)
1377 {
1378   GOutputStream *stream = source_object;
1379   WriteData *op = task_data;
1380   GOutputStreamClass *class;
1381   GError *error = NULL;
1382   gssize count_written;
1383
1384   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1385   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1386                                    cancellable, &error);
1387   if (count_written == -1)
1388     g_task_return_error (task, error);
1389   else
1390     g_task_return_int (task, count_written);
1391 }
1392
1393 static void write_async_pollable (GPollableOutputStream *stream,
1394                                   GTask                 *task);
1395
1396 static gboolean
1397 write_async_pollable_ready (GPollableOutputStream *stream,
1398                             gpointer               user_data)
1399 {
1400   GTask *task = user_data;
1401
1402   write_async_pollable (stream, task);
1403   return FALSE;
1404 }
1405
1406 static void
1407 write_async_pollable (GPollableOutputStream *stream,
1408                       GTask                 *task)
1409 {
1410   GError *error = NULL;
1411   WriteData *op = g_task_get_task_data (task);
1412   gssize count_written;
1413
1414   if (g_task_return_error_if_cancelled (task))
1415     return;
1416
1417   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1418     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1419
1420   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1421     {
1422       GSource *source;
1423
1424       g_error_free (error);
1425
1426       source = g_pollable_output_stream_create_source (stream,
1427                                                        g_task_get_cancellable (task));
1428       g_task_attach_source (task, source,
1429                             (GSourceFunc) write_async_pollable_ready);
1430       g_source_unref (source);
1431       return;
1432     }
1433
1434   if (count_written == -1)
1435     g_task_return_error (task, error);
1436   else
1437     g_task_return_int (task, count_written);
1438 }
1439
1440 static void
1441 g_output_stream_real_write_async (GOutputStream       *stream,
1442                                   const void          *buffer,
1443                                   gsize                count,
1444                                   int                  io_priority,
1445                                   GCancellable        *cancellable,
1446                                   GAsyncReadyCallback  callback,
1447                                   gpointer             user_data)
1448 {
1449   GTask *task;
1450   WriteData *op;
1451
1452   op = g_slice_new0 (WriteData);
1453   task = g_task_new (stream, cancellable, callback, user_data);
1454   g_task_set_check_cancellable (task, FALSE);
1455   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1456   op->buffer = buffer;
1457   op->count_requested = count;
1458
1459   if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1460       g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1461     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1462   else
1463     g_task_run_in_thread (task, write_async_thread);
1464   g_object_unref (task);
1465 }
1466
1467 static gssize
1468 g_output_stream_real_write_finish (GOutputStream  *stream,
1469                                    GAsyncResult   *result,
1470                                    GError        **error)
1471 {
1472   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1473
1474   return g_task_propagate_int (G_TASK (result), error);
1475 }
1476
1477 typedef struct {
1478   GInputStream *source;
1479   GOutputStreamSpliceFlags flags;
1480 } SpliceData;
1481
1482 static void
1483 free_splice_data (SpliceData *op)
1484 {
1485   g_object_unref (op->source);
1486   g_free (op);
1487 }
1488
1489 static void
1490 splice_async_thread (GTask        *task,
1491                      gpointer      source_object,
1492                      gpointer      task_data,
1493                      GCancellable *cancellable)
1494 {
1495   GOutputStream *stream = source_object;
1496   SpliceData *op = task_data;
1497   GOutputStreamClass *class;
1498   GError *error = NULL;
1499   gssize bytes_copied;
1500
1501   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1502   
1503   bytes_copied = class->splice (stream,
1504                                 op->source,
1505                                 op->flags,
1506                                 cancellable,
1507                                 &error);
1508   if (bytes_copied == -1)
1509     g_task_return_error (task, error);
1510   else
1511     g_task_return_int (task, bytes_copied);
1512 }
1513
1514 static void
1515 g_output_stream_real_splice_async (GOutputStream             *stream,
1516                                    GInputStream              *source,
1517                                    GOutputStreamSpliceFlags   flags,
1518                                    int                        io_priority,
1519                                    GCancellable              *cancellable,
1520                                    GAsyncReadyCallback        callback,
1521                                    gpointer                   user_data)
1522 {
1523   GTask *task;
1524   SpliceData *op;
1525
1526   op = g_new0 (SpliceData, 1);
1527   task = g_task_new (stream, cancellable, callback, user_data);
1528   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1529   op->flags = flags;
1530   op->source = g_object_ref (source);
1531
1532   /* TODO: In the case where both source and destintion have
1533      non-threadbased async calls we can use a true async copy here */
1534   
1535   g_task_run_in_thread (task, splice_async_thread);
1536   g_object_unref (task);
1537 }
1538
1539 static gssize
1540 g_output_stream_real_splice_finish (GOutputStream  *stream,
1541                                     GAsyncResult   *result,
1542                                     GError        **error)
1543 {
1544   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1545
1546   return g_task_propagate_int (G_TASK (result), error);
1547 }
1548
1549
1550 static void
1551 flush_async_thread (GTask        *task,
1552                     gpointer      source_object,
1553                     gpointer      task_data,
1554                     GCancellable *cancellable)
1555 {
1556   GOutputStream *stream = source_object;
1557   GOutputStreamClass *class;
1558   gboolean result;
1559   GError *error = NULL;
1560
1561   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1562   result = TRUE;
1563   if (class->flush)
1564     result = class->flush (stream, cancellable, &error);
1565
1566   if (result)
1567     g_task_return_boolean (task, TRUE);
1568   else
1569     g_task_return_error (task, error);
1570 }
1571
1572 static void
1573 g_output_stream_real_flush_async (GOutputStream       *stream,
1574                                   int                  io_priority,
1575                                   GCancellable        *cancellable,
1576                                   GAsyncReadyCallback  callback,
1577                                   gpointer             user_data)
1578 {
1579   GTask *task;
1580
1581   task = g_task_new (stream, cancellable, callback, user_data);
1582   g_task_set_priority (task, io_priority);
1583   g_task_run_in_thread (task, flush_async_thread);
1584   g_object_unref (task);
1585 }
1586
1587 static gboolean
1588 g_output_stream_real_flush_finish (GOutputStream  *stream,
1589                                    GAsyncResult   *result,
1590                                    GError        **error)
1591 {
1592   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1593
1594   return g_task_propagate_boolean (G_TASK (result), error);
1595 }
1596
1597 static void
1598 close_async_thread (GTask        *task,
1599                     gpointer      source_object,
1600                     gpointer      task_data,
1601                     GCancellable *cancellable)
1602 {
1603   GOutputStream *stream = source_object;
1604   GOutputStreamClass *class;
1605   GError *error = NULL;
1606   gboolean result = TRUE;
1607
1608   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1609
1610   /* Do a flush here if there is a flush function, and we did not have to do
1611    * an async flush before (see g_output_stream_close_async)
1612    */
1613   if (class->flush != NULL &&
1614       (class->flush_async == NULL ||
1615        class->flush_async == g_output_stream_real_flush_async))
1616     {
1617       result = class->flush (stream, cancellable, &error);
1618     }
1619
1620   /* Auto handling of cancelation disabled, and ignore
1621      cancellation, since we want to close things anyway, although
1622      possibly in a quick-n-dirty way. At least we never want to leak
1623      open handles */
1624
1625   if (class->close_fn)
1626     {
1627       /* Make sure to close, even if the flush failed (see sync close) */
1628       if (!result)
1629         class->close_fn (stream, cancellable, NULL);
1630       else
1631         result = class->close_fn (stream, cancellable, &error);
1632     }
1633
1634   if (result)
1635     g_task_return_boolean (task, TRUE);
1636   else
1637     g_task_return_error (task, error);
1638 }
1639
1640 static void
1641 g_output_stream_real_close_async (GOutputStream       *stream,
1642                                   int                  io_priority,
1643                                   GCancellable        *cancellable,
1644                                   GAsyncReadyCallback  callback,
1645                                   gpointer             user_data)
1646 {
1647   GTask *task;
1648
1649   task = g_task_new (stream, cancellable, callback, user_data);
1650   g_task_set_priority (task, io_priority);
1651   g_task_run_in_thread (task, close_async_thread);
1652   g_object_unref (task);
1653 }
1654
1655 static gboolean
1656 g_output_stream_real_close_finish (GOutputStream  *stream,
1657                                    GAsyncResult   *result,
1658                                    GError        **error)
1659 {
1660   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1661
1662   return g_task_propagate_boolean (G_TASK (result), error);
1663 }