GOutputStream: Remove unused SpliceUserData struct
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gtask.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
49
50 struct _GOutputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   guint closing : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101
102 static void
103 g_output_stream_finalize (GObject *object)
104 {
105   G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object);
106 }
107
108 static void
109 g_output_stream_dispose (GObject *object)
110 {
111   GOutputStream *stream;
112
113   stream = G_OUTPUT_STREAM (object);
114   
115   if (!stream->priv->closed)
116     g_output_stream_close (stream, NULL, NULL);
117
118   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
119 }
120
121 static void
122 g_output_stream_class_init (GOutputStreamClass *klass)
123 {
124   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
125   
126   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
127   
128   gobject_class->finalize = g_output_stream_finalize;
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
147                                               G_TYPE_OUTPUT_STREAM,
148                                               GOutputStreamPrivate);
149 }
150
151 /**
152  * g_output_stream_write:
153  * @stream: a #GOutputStream.
154  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
155  * @count: the number of bytes to write
156  * @cancellable: (allow-none): optional cancellable object
157  * @error: location to store the error occurring, or %NULL to ignore
158  *
159  * Tries to write @count bytes from @buffer into the stream. Will block
160  * during the operation.
161  * 
162  * If count is 0, returns 0 and does nothing. A value of @count
163  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
164  *
165  * On success, the number of bytes written to the stream is returned.
166  * It is not an error if this is not the same as the requested size, as it
167  * can happen e.g. on a partial I/O error, or if there is not enough
168  * storage in the stream. All writes block until at least one byte
169  * is written or an error occurs; 0 is never returned (unless
170  * @count is 0).
171  * 
172  * If @cancellable is not %NULL, then the operation can be cancelled by
173  * triggering the cancellable object from another thread. If the operation
174  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
175  * operation was partially finished when the operation was cancelled the
176  * partial result will be returned, without an error.
177  *
178  * On error -1 is returned and @error is set accordingly.
179  * 
180  * Virtual: write_fn
181  *
182  * Return value: Number of bytes written, or -1 on error
183  **/
184 gssize
185 g_output_stream_write (GOutputStream  *stream,
186                        const void     *buffer,
187                        gsize           count,
188                        GCancellable   *cancellable,
189                        GError        **error)
190 {
191   GOutputStreamClass *class;
192   gssize res;
193
194   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
195   g_return_val_if_fail (buffer != NULL, 0);
196
197   if (count == 0)
198     return 0;
199   
200   if (((gssize) count) < 0)
201     {
202       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
203                    _("Too large count value passed to %s"), G_STRFUNC);
204       return -1;
205     }
206
207   class = G_OUTPUT_STREAM_GET_CLASS (stream);
208
209   if (class->write_fn == NULL) 
210     {
211       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
212                            _("Output stream doesn't implement write"));
213       return -1;
214     }
215   
216   if (!g_output_stream_set_pending (stream, error))
217     return -1;
218   
219   if (cancellable)
220     g_cancellable_push_current (cancellable);
221   
222   res = class->write_fn (stream, buffer, count, cancellable, error);
223   
224   if (cancellable)
225     g_cancellable_pop_current (cancellable);
226   
227   g_output_stream_clear_pending (stream);
228
229   return res; 
230 }
231
232 /**
233  * g_output_stream_write_all:
234  * @stream: a #GOutputStream.
235  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
236  * @count: the number of bytes to write
237  * @bytes_written: (out): location to store the number of bytes that was 
238  *     written to the stream
239  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
240  * @error: location to store the error occurring, or %NULL to ignore
241  *
242  * Tries to write @count bytes from @buffer into the stream. Will block
243  * during the operation.
244  * 
245  * This function is similar to g_output_stream_write(), except it tries to
246  * write as many bytes as requested, only stopping on an error.
247  *
248  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
249  * is set to @count.
250  * 
251  * If there is an error during the operation %FALSE is returned and @error
252  * is set to indicate the error status, @bytes_written is updated to contain
253  * the number of bytes written into the stream before the error occurred.
254  *
255  * Return value: %TRUE on success, %FALSE if there was an error
256  **/
257 gboolean
258 g_output_stream_write_all (GOutputStream  *stream,
259                            const void     *buffer,
260                            gsize           count,
261                            gsize          *bytes_written,
262                            GCancellable   *cancellable,
263                            GError        **error)
264 {
265   gsize _bytes_written;
266   gssize res;
267
268   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
269   g_return_val_if_fail (buffer != NULL, FALSE);
270
271   _bytes_written = 0;
272   while (_bytes_written < count)
273     {
274       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
275                                    cancellable, error);
276       if (res == -1)
277         {
278           if (bytes_written)
279             *bytes_written = _bytes_written;
280           return FALSE;
281         }
282       
283       if (res == 0)
284         g_warning ("Write returned zero without error");
285
286       _bytes_written += res;
287     }
288   
289   if (bytes_written)
290     *bytes_written = _bytes_written;
291
292   return TRUE;
293 }
294
295 /**
296  * g_output_stream_write_bytes:
297  * @stream: a #GOutputStream.
298  * @bytes: the #GBytes to write
299  * @cancellable: (allow-none): optional cancellable object
300  * @error: location to store the error occurring, or %NULL to ignore
301  *
302  * Tries to write the data from @bytes into the stream. Will block
303  * during the operation.
304  *
305  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
306  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
307  *
308  * On success, the number of bytes written to the stream is returned.
309  * It is not an error if this is not the same as the requested size, as it
310  * can happen e.g. on a partial I/O error, or if there is not enough
311  * storage in the stream. All writes block until at least one byte
312  * is written or an error occurs; 0 is never returned (unless
313  * the size of @bytes is 0).
314  *
315  * If @cancellable is not %NULL, then the operation can be cancelled by
316  * triggering the cancellable object from another thread. If the operation
317  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
318  * operation was partially finished when the operation was cancelled the
319  * partial result will be returned, without an error.
320  *
321  * On error -1 is returned and @error is set accordingly.
322  *
323  * Return value: Number of bytes written, or -1 on error
324  **/
325 gssize
326 g_output_stream_write_bytes (GOutputStream  *stream,
327                              GBytes         *bytes,
328                              GCancellable   *cancellable,
329                              GError        **error)
330 {
331   gsize size;
332   gconstpointer data;
333
334   data = g_bytes_get_data (bytes, &size);
335
336   return g_output_stream_write (stream,
337                                 data, size,
338                                 cancellable,
339                                 error);
340 }
341
342 /**
343  * g_output_stream_flush:
344  * @stream: a #GOutputStream.
345  * @cancellable: (allow-none): optional cancellable object
346  * @error: location to store the error occurring, or %NULL to ignore
347  *
348  * Forces a write of all user-space buffered data for the given
349  * @stream. Will block during the operation. Closing the stream will
350  * implicitly cause a flush.
351  *
352  * This function is optional for inherited classes.
353  * 
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
357  *
358  * Return value: %TRUE on success, %FALSE on error
359  **/
360 gboolean
361 g_output_stream_flush (GOutputStream  *stream,
362                        GCancellable   *cancellable,
363                        GError        **error)
364 {
365   GOutputStreamClass *class;
366   gboolean res;
367
368   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
369
370   if (!g_output_stream_set_pending (stream, error))
371     return FALSE;
372   
373   class = G_OUTPUT_STREAM_GET_CLASS (stream);
374
375   res = TRUE;
376   if (class->flush)
377     {
378       if (cancellable)
379         g_cancellable_push_current (cancellable);
380       
381       res = class->flush (stream, cancellable, error);
382       
383       if (cancellable)
384         g_cancellable_pop_current (cancellable);
385     }
386   
387   g_output_stream_clear_pending (stream);
388
389   return res;
390 }
391
392 /**
393  * g_output_stream_splice:
394  * @stream: a #GOutputStream.
395  * @source: a #GInputStream.
396  * @flags: a set of #GOutputStreamSpliceFlags.
397  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
398  * @error: a #GError location to store the error occurring, or %NULL to
399  * ignore.
400  *
401  * Splices an input stream into an output stream.
402  *
403  * Returns: a #gssize containing the size of the data spliced, or
404  *     -1 if an error occurred. Note that if the number of bytes
405  *     spliced is greater than %G_MAXSSIZE, then that will be
406  *     returned, and there is no way to determine the actual number
407  *     of bytes spliced.
408  **/
409 gssize
410 g_output_stream_splice (GOutputStream             *stream,
411                         GInputStream              *source,
412                         GOutputStreamSpliceFlags   flags,
413                         GCancellable              *cancellable,
414                         GError                   **error)
415 {
416   GOutputStreamClass *class;
417   gssize bytes_copied;
418
419   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
420   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
421
422   if (g_input_stream_is_closed (source))
423     {
424       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
425                            _("Source stream is already closed"));
426       return -1;
427     }
428
429   if (!g_output_stream_set_pending (stream, error))
430     return -1;
431
432   class = G_OUTPUT_STREAM_GET_CLASS (stream);
433
434   if (cancellable)
435     g_cancellable_push_current (cancellable);
436
437   bytes_copied = class->splice (stream, source, flags, cancellable, error);
438
439   if (cancellable)
440     g_cancellable_pop_current (cancellable);
441
442   g_output_stream_clear_pending (stream);
443
444   return bytes_copied;
445 }
446
447 static gssize
448 g_output_stream_real_splice (GOutputStream             *stream,
449                              GInputStream              *source,
450                              GOutputStreamSpliceFlags   flags,
451                              GCancellable              *cancellable,
452                              GError                   **error)
453 {
454   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
455   gssize n_read, n_written;
456   gsize bytes_copied;
457   char buffer[8192], *p;
458   gboolean res;
459
460   bytes_copied = 0;
461   if (class->write_fn == NULL)
462     {
463       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
464                            _("Output stream doesn't implement write"));
465       res = FALSE;
466       goto notsupported;
467     }
468
469   res = TRUE;
470   do
471     {
472       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
473       if (n_read == -1)
474         {
475           res = FALSE;
476           break;
477         }
478
479       if (n_read == 0)
480         break;
481
482       p = buffer;
483       while (n_read > 0)
484         {
485           n_written = class->write_fn (stream, p, n_read, cancellable, error);
486           if (n_written == -1)
487             {
488               res = FALSE;
489               break;
490             }
491
492           p += n_written;
493           n_read -= n_written;
494           bytes_copied += n_written;
495         }
496
497       if (bytes_copied > G_MAXSSIZE)
498         bytes_copied = G_MAXSSIZE;
499     }
500   while (res);
501
502  notsupported:
503   if (!res)
504     error = NULL; /* Ignore further errors */
505
506   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
507     {
508       /* Don't care about errors in source here */
509       g_input_stream_close (source, cancellable, NULL);
510     }
511
512   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
513     {
514       /* But write errors on close are bad! */
515       res = _g_output_stream_close_internal (stream, cancellable, error);
516     }
517
518   if (res)
519     return bytes_copied;
520
521   return -1;
522 }
523
524 /* Must always be called inside
525  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
526 static gboolean
527 _g_output_stream_close_internal (GOutputStream  *stream,
528                                  GCancellable   *cancellable,
529                                  GError        **error)
530 {
531   GOutputStreamClass *class;
532   gboolean res;
533
534   if (stream->priv->closed)
535     return TRUE;
536
537   class = G_OUTPUT_STREAM_GET_CLASS (stream);
538
539   stream->priv->closing = TRUE;
540
541   if (cancellable)
542     g_cancellable_push_current (cancellable);
543
544   if (class->flush)
545     res = class->flush (stream, cancellable, error);
546   else
547     res = TRUE;
548
549   if (!res)
550     {
551       /* flushing caused the error that we want to return,
552        * but we still want to close the underlying stream if possible
553        */
554       if (class->close_fn)
555         class->close_fn (stream, cancellable, NULL);
556     }
557   else
558     {
559       res = TRUE;
560       if (class->close_fn)
561         res = class->close_fn (stream, cancellable, error);
562     }
563
564   if (cancellable)
565     g_cancellable_pop_current (cancellable);
566
567   stream->priv->closing = FALSE;
568   stream->priv->closed = TRUE;
569
570   return res;
571 }
572
573 /**
574  * g_output_stream_close:
575  * @stream: A #GOutputStream.
576  * @cancellable: (allow-none): optional cancellable object
577  * @error: location to store the error occurring, or %NULL to ignore
578  *
579  * Closes the stream, releasing resources related to it.
580  *
581  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
582  * Closing a stream multiple times will not return an error.
583  *
584  * Closing a stream will automatically flush any outstanding buffers in the
585  * stream.
586  *
587  * Streams will be automatically closed when the last reference
588  * is dropped, but you might want to call this function to make sure 
589  * resources are released as early as possible.
590  *
591  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
592  * open after the stream is closed. See the documentation for the individual
593  * stream for details.
594  *
595  * On failure the first error that happened will be reported, but the close
596  * operation will finish as much as possible. A stream that failed to
597  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
598  * is important to check and report the error to the user, otherwise
599  * there might be a loss of data as all data might not be written.
600  * 
601  * If @cancellable is not %NULL, then the operation can be cancelled by
602  * triggering the cancellable object from another thread. If the operation
603  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
604  * Cancelling a close will still leave the stream closed, but there some streams
605  * can use a faster close that doesn't block to e.g. check errors. On
606  * cancellation (as with any error) there is no guarantee that all written
607  * data will reach the target. 
608  *
609  * Return value: %TRUE on success, %FALSE on failure
610  **/
611 gboolean
612 g_output_stream_close (GOutputStream  *stream,
613                        GCancellable   *cancellable,
614                        GError        **error)
615 {
616   gboolean res;
617
618   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
619
620   if (stream->priv->closed)
621     return TRUE;
622
623   if (!g_output_stream_set_pending (stream, error))
624     return FALSE;
625
626   res = _g_output_stream_close_internal (stream, cancellable, error);
627
628   g_output_stream_clear_pending (stream);
629   
630   return res;
631 }
632
633 static void
634 async_ready_write_callback_wrapper (GObject      *source_object,
635                                     GAsyncResult *res,
636                                     gpointer      user_data)
637 {
638   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
639   GOutputStreamClass *class;
640   GTask *task = user_data;
641   gssize nwrote;
642   GError *error = NULL;
643
644   g_output_stream_clear_pending (stream);
645   
646   if (g_async_result_legacy_propagate_error (res, &error))
647     nwrote = -1;
648   else
649     {
650       class = G_OUTPUT_STREAM_GET_CLASS (stream);
651       nwrote = class->write_finish (stream, res, &error);
652     }
653
654   if (nwrote >= 0)
655     g_task_return_int (task, nwrote);
656   else
657     g_task_return_error (task, error);
658   g_object_unref (task);
659 }
660
661 /**
662  * g_output_stream_write_async:
663  * @stream: A #GOutputStream.
664  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
665  * @count: the number of bytes to write
666  * @io_priority: the io priority of the request.
667  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
668  * @callback: (scope async): callback to call when the request is satisfied
669  * @user_data: (closure): the data to pass to callback function
670  *
671  * Request an asynchronous write of @count bytes from @buffer into 
672  * the stream. When the operation is finished @callback will be called.
673  * You can then call g_output_stream_write_finish() to get the result of the 
674  * operation.
675  *
676  * During an async request no other sync and async calls are allowed, 
677  * and will result in %G_IO_ERROR_PENDING errors. 
678  *
679  * A value of @count larger than %G_MAXSSIZE will cause a 
680  * %G_IO_ERROR_INVALID_ARGUMENT error.
681  *
682  * On success, the number of bytes written will be passed to the
683  * @callback. It is not an error if this is not the same as the 
684  * requested size, as it can happen e.g. on a partial I/O error, 
685  * but generally we try to write as many bytes as requested. 
686  *
687  * You are guaranteed that this method will never fail with
688  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
689  * method will just wait until this changes.
690  *
691  * Any outstanding I/O request with higher priority (lower numerical 
692  * value) will be executed before an outstanding request with lower 
693  * priority. Default priority is %G_PRIORITY_DEFAULT.
694  *
695  * The asyncronous methods have a default fallback that uses threads 
696  * to implement asynchronicity, so they are optional for inheriting 
697  * classes. However, if you override one you must override all.
698  *
699  * For the synchronous, blocking version of this function, see 
700  * g_output_stream_write().
701  **/
702 void
703 g_output_stream_write_async (GOutputStream       *stream,
704                              const void          *buffer,
705                              gsize                count,
706                              int                  io_priority,
707                              GCancellable        *cancellable,
708                              GAsyncReadyCallback  callback,
709                              gpointer             user_data)
710 {
711   GOutputStreamClass *class;
712   GError *error = NULL;
713   GTask *task;
714
715   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
716   g_return_if_fail (buffer != NULL);
717
718   task = g_task_new (stream, cancellable, callback, user_data);
719   g_task_set_source_tag (task, g_output_stream_write_async);
720   g_task_set_priority (task, io_priority);
721
722   if (count == 0)
723     {
724       g_task_return_int (task, 0);
725       g_object_unref (task);
726       return;
727     }
728
729   if (((gssize) count) < 0)
730     {
731       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
732                                _("Too large count value passed to %s"),
733                                G_STRFUNC);
734       g_object_unref (task);
735       return;
736     }
737
738   if (!g_output_stream_set_pending (stream, &error))
739     {
740       g_task_return_error (task, error);
741       g_object_unref (task);
742       return;
743     }
744   
745   class = G_OUTPUT_STREAM_GET_CLASS (stream);
746
747   class->write_async (stream, buffer, count, io_priority, cancellable,
748                       async_ready_write_callback_wrapper, task);
749 }
750
751 /**
752  * g_output_stream_write_finish:
753  * @stream: a #GOutputStream.
754  * @result: a #GAsyncResult.
755  * @error: a #GError location to store the error occurring, or %NULL to 
756  * ignore.
757  * 
758  * Finishes a stream write operation.
759  * 
760  * Returns: a #gssize containing the number of bytes written to the stream.
761  **/
762 gssize
763 g_output_stream_write_finish (GOutputStream  *stream,
764                               GAsyncResult   *result,
765                               GError        **error)
766 {
767   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
768   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
769   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
770
771   /* @result is always the GTask created by g_output_stream_write_async();
772    * we called class->write_finish() from async_ready_write_callback_wrapper.
773    */
774   return g_task_propagate_int (G_TASK (result), error);
775 }
776
777 static void
778 write_bytes_callback (GObject      *stream,
779                       GAsyncResult *result,
780                       gpointer      user_data)
781 {
782   GTask *task = user_data;
783   GError *error = NULL;
784   gssize nwrote;
785
786   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
787                                          result, &error);
788   if (nwrote == -1)
789     g_task_return_error (task, error);
790   else
791     g_task_return_int (task, nwrote);
792   g_object_unref (task);
793 }
794
795 /**
796  * g_output_stream_write_bytes_async:
797  * @stream: A #GOutputStream.
798  * @bytes: The bytes to write
799  * @io_priority: the io priority of the request.
800  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
801  * @callback: (scope async): callback to call when the request is satisfied
802  * @user_data: (closure): the data to pass to callback function
803  *
804  * Request an asynchronous write of the data in @bytes to the stream.
805  * When the operation is finished @callback will be called. You can
806  * then call g_output_stream_write_bytes_finish() to get the result of
807  * the operation.
808  *
809  * During an async request no other sync and async calls are allowed,
810  * and will result in %G_IO_ERROR_PENDING errors.
811  *
812  * A #GBytes larger than %G_MAXSSIZE will cause a
813  * %G_IO_ERROR_INVALID_ARGUMENT error.
814  *
815  * On success, the number of bytes written will be passed to the
816  * @callback. It is not an error if this is not the same as the
817  * requested size, as it can happen e.g. on a partial I/O error,
818  * but generally we try to write as many bytes as requested.
819  *
820  * You are guaranteed that this method will never fail with
821  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
822  * method will just wait until this changes.
823  *
824  * Any outstanding I/O request with higher priority (lower numerical
825  * value) will be executed before an outstanding request with lower
826  * priority. Default priority is %G_PRIORITY_DEFAULT.
827  *
828  * For the synchronous, blocking version of this function, see
829  * g_output_stream_write_bytes().
830  **/
831 void
832 g_output_stream_write_bytes_async (GOutputStream       *stream,
833                                    GBytes              *bytes,
834                                    int                  io_priority,
835                                    GCancellable        *cancellable,
836                                    GAsyncReadyCallback  callback,
837                                    gpointer             user_data)
838 {
839   GTask *task;
840   gsize size;
841   gconstpointer data;
842
843   data = g_bytes_get_data (bytes, &size);
844
845   task = g_task_new (stream, cancellable, callback, user_data);
846   g_task_set_task_data (task, g_bytes_ref (bytes),
847                         (GDestroyNotify) g_bytes_unref);
848
849   g_output_stream_write_async (stream,
850                                data, size,
851                                io_priority,
852                                cancellable,
853                                write_bytes_callback,
854                                task);
855 }
856
857 /**
858  * g_output_stream_write_bytes_finish:
859  * @stream: a #GOutputStream.
860  * @result: a #GAsyncResult.
861  * @error: a #GError location to store the error occurring, or %NULL to
862  * ignore.
863  *
864  * Finishes a stream write-from-#GBytes operation.
865  *
866  * Returns: a #gssize containing the number of bytes written to the stream.
867  **/
868 gssize
869 g_output_stream_write_bytes_finish (GOutputStream  *stream,
870                                     GAsyncResult   *result,
871                                     GError        **error)
872 {
873   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
874   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
875
876   return g_task_propagate_int (G_TASK (result), error);
877 }
878
879 static void
880 async_ready_splice_callback_wrapper (GObject      *source_object,
881                                      GAsyncResult *res,
882                                      gpointer     _data)
883 {
884   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
885   GOutputStreamClass *class;
886   GTask *task = _data;
887   gssize nspliced;
888   GError *error = NULL;
889
890   g_output_stream_clear_pending (stream);
891   
892   if (g_async_result_legacy_propagate_error (res, &error))
893     nspliced = -1;
894   else
895     {
896       class = G_OUTPUT_STREAM_GET_CLASS (stream);
897       nspliced = class->splice_finish (stream, res, &error);
898     }
899
900   if (nspliced >= 0)
901     g_task_return_int (task, nspliced);
902   else
903     g_task_return_error (task, error);
904   g_object_unref (task);
905 }
906
907 /**
908  * g_output_stream_splice_async:
909  * @stream: a #GOutputStream.
910  * @source: a #GInputStream. 
911  * @flags: a set of #GOutputStreamSpliceFlags.
912  * @io_priority: the io priority of the request.
913  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
914  * @callback: (scope async): a #GAsyncReadyCallback. 
915  * @user_data: (closure): user data passed to @callback.
916  * 
917  * Splices a stream asynchronously.
918  * When the operation is finished @callback will be called.
919  * You can then call g_output_stream_splice_finish() to get the 
920  * result of the operation.
921  *
922  * For the synchronous, blocking version of this function, see 
923  * g_output_stream_splice().
924  **/
925 void
926 g_output_stream_splice_async (GOutputStream            *stream,
927                               GInputStream             *source,
928                               GOutputStreamSpliceFlags  flags,
929                               int                       io_priority,
930                               GCancellable             *cancellable,
931                               GAsyncReadyCallback       callback,
932                               gpointer                  user_data)
933 {
934   GOutputStreamClass *class;
935   GTask *task;
936   GError *error = NULL;
937
938   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
939   g_return_if_fail (G_IS_INPUT_STREAM (source));
940
941   task = g_task_new (stream, cancellable, callback, user_data);
942   g_task_set_source_tag (task, g_output_stream_splice_async);
943   g_task_set_priority (task, io_priority);
944   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
945
946   if (g_input_stream_is_closed (source))
947     {
948       g_task_return_new_error (task,
949                                G_IO_ERROR, G_IO_ERROR_CLOSED,
950                                _("Source stream is already closed"));
951       g_object_unref (task);
952       return;
953     }
954   
955   if (!g_output_stream_set_pending (stream, &error))
956     {
957       g_task_return_error (task, error);
958       g_object_unref (task);
959       return;
960     }
961
962   class = G_OUTPUT_STREAM_GET_CLASS (stream);
963
964   class->splice_async (stream, source, flags, io_priority, cancellable,
965                        async_ready_splice_callback_wrapper, task);
966 }
967
968 /**
969  * g_output_stream_splice_finish:
970  * @stream: a #GOutputStream.
971  * @result: a #GAsyncResult.
972  * @error: a #GError location to store the error occurring, or %NULL to 
973  * ignore.
974  *
975  * Finishes an asynchronous stream splice operation.
976  * 
977  * Returns: a #gssize of the number of bytes spliced. Note that if the
978  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
979  *     will be returned, and there is no way to determine the actual
980  *     number of bytes spliced.
981  **/
982 gssize
983 g_output_stream_splice_finish (GOutputStream  *stream,
984                                GAsyncResult   *result,
985                                GError        **error)
986 {
987   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
988   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
989   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
990
991   /* @result is always the GTask created by g_output_stream_splice_async();
992    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
993    */
994   return g_task_propagate_int (G_TASK (result), error);
995 }
996
997 static void
998 async_ready_flush_callback_wrapper (GObject      *source_object,
999                                     GAsyncResult *res,
1000                                     gpointer      user_data)
1001 {
1002   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1003   GOutputStreamClass *class;
1004   GTask *task = user_data;
1005   gboolean flushed;
1006   GError *error = NULL;
1007
1008   g_output_stream_clear_pending (stream);
1009   
1010   if (g_async_result_legacy_propagate_error (res, &error))
1011     flushed = FALSE;
1012   else
1013     {
1014       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1015       flushed = class->flush_finish (stream, res, &error);
1016     }
1017
1018   if (flushed)
1019     g_task_return_boolean (task, TRUE);
1020   else
1021     g_task_return_error (task, error);
1022   g_object_unref (task);
1023 }
1024
1025 /**
1026  * g_output_stream_flush_async:
1027  * @stream: a #GOutputStream.
1028  * @io_priority: the io priority of the request.
1029  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1030  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1031  * @user_data: (closure): the data to pass to callback function
1032  * 
1033  * Forces an asynchronous write of all user-space buffered data for
1034  * the given @stream.
1035  * For behaviour details see g_output_stream_flush().
1036  *
1037  * When the operation is finished @callback will be 
1038  * called. You can then call g_output_stream_flush_finish() to get the 
1039  * result of the operation.
1040  **/
1041 void
1042 g_output_stream_flush_async (GOutputStream       *stream,
1043                              int                  io_priority,
1044                              GCancellable        *cancellable,
1045                              GAsyncReadyCallback  callback,
1046                              gpointer             user_data)
1047 {
1048   GOutputStreamClass *class;
1049   GTask *task;
1050   GError *error = NULL;
1051
1052   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1053
1054   task = g_task_new (stream, cancellable, callback, user_data);
1055   g_task_set_source_tag (task, g_output_stream_flush_async);
1056   g_task_set_priority (task, io_priority);
1057
1058   if (!g_output_stream_set_pending (stream, &error))
1059     {
1060       g_task_return_error (task, error);
1061       g_object_unref (task);
1062       return;
1063     }
1064
1065   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1066   
1067   if (class->flush_async == NULL)
1068     {
1069       g_task_return_boolean (task, TRUE);
1070       g_object_unref (task);
1071       return;
1072     }
1073       
1074   class->flush_async (stream, io_priority, cancellable,
1075                       async_ready_flush_callback_wrapper, task);
1076 }
1077
1078 /**
1079  * g_output_stream_flush_finish:
1080  * @stream: a #GOutputStream.
1081  * @result: a GAsyncResult.
1082  * @error: a #GError location to store the error occurring, or %NULL to 
1083  * ignore.
1084  * 
1085  * Finishes flushing an output stream.
1086  * 
1087  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1088  **/
1089 gboolean
1090 g_output_stream_flush_finish (GOutputStream  *stream,
1091                               GAsyncResult   *result,
1092                               GError        **error)
1093 {
1094   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1095   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1096   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1097
1098   /* @result is always the GTask created by g_output_stream_flush_async();
1099    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1100    */
1101   return g_task_propagate_boolean (G_TASK (result), error);
1102 }
1103
1104
1105 static void
1106 async_ready_close_callback_wrapper (GObject      *source_object,
1107                                     GAsyncResult *res,
1108                                     gpointer      user_data)
1109 {
1110   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1111   GOutputStreamClass *class;
1112   GTask *task = user_data;
1113   GError *error = g_task_get_task_data (task);
1114
1115   stream->priv->closing = FALSE;
1116   stream->priv->closed = TRUE;
1117
1118   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1119     {
1120       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1121
1122       class->close_finish (stream, res,
1123                            error ? NULL : &error);
1124     }
1125
1126   g_output_stream_clear_pending (stream);
1127
1128   if (error != NULL)
1129     g_task_return_error (task, error);
1130   else
1131     g_task_return_boolean (task, TRUE);
1132   g_object_unref (task);
1133 }
1134
1135 static void
1136 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1137                                             GAsyncResult *res,
1138                                             gpointer      user_data)
1139 {
1140   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1141   GOutputStreamClass *class;
1142   GTask *task = user_data;
1143   GError *error = NULL;
1144
1145   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1146
1147   if (!g_async_result_legacy_propagate_error (res, &error))
1148     {
1149       class->flush_finish (stream, res, &error);
1150     }
1151
1152   /* propagate the possible error */
1153   if (error)
1154     g_task_set_task_data (task, error, NULL);
1155
1156   /* we still close, even if there was a flush error */
1157   class->close_async (stream,
1158                       g_task_get_priority (task),
1159                       g_task_get_cancellable (task),
1160                       async_ready_close_callback_wrapper, task);
1161 }
1162
1163 /**
1164  * g_output_stream_close_async:
1165  * @stream: A #GOutputStream.
1166  * @io_priority: the io priority of the request.
1167  * @cancellable: (allow-none): optional cancellable object
1168  * @callback: (scope async): callback to call when the request is satisfied
1169  * @user_data: (closure): the data to pass to callback function
1170  *
1171  * Requests an asynchronous close of the stream, releasing resources 
1172  * related to it. When the operation is finished @callback will be 
1173  * called. You can then call g_output_stream_close_finish() to get 
1174  * the result of the operation.
1175  *
1176  * For behaviour details see g_output_stream_close().
1177  *
1178  * The asyncronous methods have a default fallback that uses threads 
1179  * to implement asynchronicity, so they are optional for inheriting 
1180  * classes. However, if you override one you must override all.
1181  **/
1182 void
1183 g_output_stream_close_async (GOutputStream       *stream,
1184                              int                  io_priority,
1185                              GCancellable        *cancellable,
1186                              GAsyncReadyCallback  callback,
1187                              gpointer             user_data)
1188 {
1189   GOutputStreamClass *class;
1190   GTask *task;
1191   GError *error = NULL;
1192
1193   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1194   
1195   task = g_task_new (stream, cancellable, callback, user_data);
1196   g_task_set_source_tag (task, g_output_stream_close_async);
1197   g_task_set_priority (task, io_priority);
1198
1199   if (stream->priv->closed)
1200     {
1201       g_task_return_boolean (task, TRUE);
1202       g_object_unref (task);
1203       return;
1204     }
1205
1206   if (!g_output_stream_set_pending (stream, &error))
1207     {
1208       g_task_return_error (task, error);
1209       g_object_unref (task);
1210       return;
1211     }
1212   
1213   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1214   stream->priv->closing = TRUE;
1215
1216   /* Call close_async directly if there is no need to flush, or if the flush
1217      can be done sync (in the output stream async close thread) */
1218   if (class->flush_async == NULL ||
1219       (class->flush_async == g_output_stream_real_flush_async &&
1220        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1221     {
1222       class->close_async (stream, io_priority, cancellable,
1223                           async_ready_close_callback_wrapper, task);
1224     }
1225   else
1226     {
1227       /* First do an async flush, then do the async close in the callback
1228          wrapper (see async_ready_close_flushed_callback_wrapper) */
1229       class->flush_async (stream, io_priority, cancellable,
1230                           async_ready_close_flushed_callback_wrapper, task);
1231     }
1232 }
1233
1234 /**
1235  * g_output_stream_close_finish:
1236  * @stream: a #GOutputStream.
1237  * @result: a #GAsyncResult.
1238  * @error: a #GError location to store the error occurring, or %NULL to 
1239  * ignore.
1240  * 
1241  * Closes an output stream.
1242  * 
1243  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1244  **/
1245 gboolean
1246 g_output_stream_close_finish (GOutputStream  *stream,
1247                               GAsyncResult   *result,
1248                               GError        **error)
1249 {
1250   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1251   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1252   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1253
1254   /* @result is always the GTask created by g_output_stream_close_async();
1255    * we called class->close_finish() from async_ready_close_callback_wrapper.
1256    */
1257   return g_task_propagate_boolean (G_TASK (result), error);
1258 }
1259
1260 /**
1261  * g_output_stream_is_closed:
1262  * @stream: a #GOutputStream.
1263  * 
1264  * Checks if an output stream has already been closed.
1265  * 
1266  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1267  **/
1268 gboolean
1269 g_output_stream_is_closed (GOutputStream *stream)
1270 {
1271   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1272   
1273   return stream->priv->closed;
1274 }
1275
1276 /**
1277  * g_output_stream_is_closing:
1278  * @stream: a #GOutputStream.
1279  *
1280  * Checks if an output stream is being closed. This can be
1281  * used inside e.g. a flush implementation to see if the
1282  * flush (or other i/o operation) is called from within
1283  * the closing operation.
1284  *
1285  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1286  *
1287  * Since: 2.24
1288  **/
1289 gboolean
1290 g_output_stream_is_closing (GOutputStream *stream)
1291 {
1292   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1293
1294   return stream->priv->closing;
1295 }
1296
1297 /**
1298  * g_output_stream_has_pending:
1299  * @stream: a #GOutputStream.
1300  * 
1301  * Checks if an ouput stream has pending actions.
1302  * 
1303  * Returns: %TRUE if @stream has pending actions. 
1304  **/
1305 gboolean
1306 g_output_stream_has_pending (GOutputStream *stream)
1307 {
1308   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1309   
1310   return stream->priv->pending;
1311 }
1312
1313 /**
1314  * g_output_stream_set_pending:
1315  * @stream: a #GOutputStream.
1316  * @error: a #GError location to store the error occurring, or %NULL to 
1317  * ignore.
1318  * 
1319  * Sets @stream to have actions pending. If the pending flag is
1320  * already set or @stream is closed, it will return %FALSE and set
1321  * @error.
1322  *
1323  * Return value: %TRUE if pending was previously unset and is now set.
1324  **/
1325 gboolean
1326 g_output_stream_set_pending (GOutputStream *stream,
1327                              GError **error)
1328 {
1329   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1330   
1331   if (stream->priv->closed)
1332     {
1333       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1334                            _("Stream is already closed"));
1335       return FALSE;
1336     }
1337   
1338   if (stream->priv->pending)
1339     {
1340       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1341                            /* Translators: This is an error you get if there is
1342                             * already an operation running against this stream when
1343                             * you try to start one */
1344                            _("Stream has outstanding operation"));
1345       return FALSE;
1346     }
1347   
1348   stream->priv->pending = TRUE;
1349   return TRUE;
1350 }
1351
1352 /**
1353  * g_output_stream_clear_pending:
1354  * @stream: output stream
1355  * 
1356  * Clears the pending flag on @stream.
1357  **/
1358 void
1359 g_output_stream_clear_pending (GOutputStream *stream)
1360 {
1361   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1362   
1363   stream->priv->pending = FALSE;
1364 }
1365
1366
1367 /********************************************
1368  *   Default implementation of async ops    *
1369  ********************************************/
1370
1371 typedef struct {
1372   const void         *buffer;
1373   gsize               count_requested;
1374   gssize              count_written;
1375 } WriteData;
1376
1377 static void
1378 free_write_data (WriteData *op)
1379 {
1380   g_slice_free (WriteData, op);
1381 }
1382
1383 static void
1384 write_async_thread (GTask        *task,
1385                     gpointer      source_object,
1386                     gpointer      task_data,
1387                     GCancellable *cancellable)
1388 {
1389   GOutputStream *stream = source_object;
1390   WriteData *op = task_data;
1391   GOutputStreamClass *class;
1392   GError *error = NULL;
1393   gssize count_written;
1394
1395   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1396   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1397                                    cancellable, &error);
1398   if (count_written == -1)
1399     g_task_return_error (task, error);
1400   else
1401     g_task_return_int (task, count_written);
1402 }
1403
1404 static void write_async_pollable (GPollableOutputStream *stream,
1405                                   GTask                 *task);
1406
1407 static gboolean
1408 write_async_pollable_ready (GPollableOutputStream *stream,
1409                             gpointer               user_data)
1410 {
1411   GTask *task = user_data;
1412
1413   write_async_pollable (stream, task);
1414   return FALSE;
1415 }
1416
1417 static void
1418 write_async_pollable (GPollableOutputStream *stream,
1419                       GTask                 *task)
1420 {
1421   GError *error = NULL;
1422   WriteData *op = g_task_get_task_data (task);
1423   gssize count_written;
1424
1425   if (g_task_return_error_if_cancelled (task))
1426     return;
1427
1428   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1429     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1430
1431   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1432     {
1433       GSource *source;
1434
1435       g_error_free (error);
1436
1437       source = g_pollable_output_stream_create_source (stream,
1438                                                        g_task_get_cancellable (task));
1439       g_task_attach_source (task, source,
1440                             (GSourceFunc) write_async_pollable_ready);
1441       g_source_unref (source);
1442       return;
1443     }
1444
1445   if (count_written == -1)
1446     g_task_return_error (task, error);
1447   else
1448     g_task_return_int (task, count_written);
1449 }
1450
1451 static void
1452 g_output_stream_real_write_async (GOutputStream       *stream,
1453                                   const void          *buffer,
1454                                   gsize                count,
1455                                   int                  io_priority,
1456                                   GCancellable        *cancellable,
1457                                   GAsyncReadyCallback  callback,
1458                                   gpointer             user_data)
1459 {
1460   GTask *task;
1461   WriteData *op;
1462
1463   op = g_slice_new0 (WriteData);
1464   task = g_task_new (stream, cancellable, callback, user_data);
1465   g_task_set_check_cancellable (task, FALSE);
1466   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1467   op->buffer = buffer;
1468   op->count_requested = count;
1469
1470   if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1471       g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1472     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1473   else
1474     g_task_run_in_thread (task, write_async_thread);
1475   g_object_unref (task);
1476 }
1477
1478 static gssize
1479 g_output_stream_real_write_finish (GOutputStream  *stream,
1480                                    GAsyncResult   *result,
1481                                    GError        **error)
1482 {
1483   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1484
1485   return g_task_propagate_int (G_TASK (result), error);
1486 }
1487
1488 typedef struct {
1489   GInputStream *source;
1490   GOutputStreamSpliceFlags flags;
1491 } SpliceData;
1492
1493 static void
1494 free_splice_data (SpliceData *op)
1495 {
1496   g_object_unref (op->source);
1497   g_free (op);
1498 }
1499
1500 static void
1501 splice_async_thread (GTask        *task,
1502                      gpointer      source_object,
1503                      gpointer      task_data,
1504                      GCancellable *cancellable)
1505 {
1506   GOutputStream *stream = source_object;
1507   SpliceData *op = task_data;
1508   GOutputStreamClass *class;
1509   GError *error = NULL;
1510   gssize bytes_copied;
1511
1512   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1513   
1514   bytes_copied = class->splice (stream,
1515                                 op->source,
1516                                 op->flags,
1517                                 cancellable,
1518                                 &error);
1519   if (bytes_copied == -1)
1520     g_task_return_error (task, error);
1521   else
1522     g_task_return_int (task, bytes_copied);
1523 }
1524
1525 static void
1526 g_output_stream_real_splice_async (GOutputStream             *stream,
1527                                    GInputStream              *source,
1528                                    GOutputStreamSpliceFlags   flags,
1529                                    int                        io_priority,
1530                                    GCancellable              *cancellable,
1531                                    GAsyncReadyCallback        callback,
1532                                    gpointer                   user_data)
1533 {
1534   GTask *task;
1535   SpliceData *op;
1536
1537   op = g_new0 (SpliceData, 1);
1538   task = g_task_new (stream, cancellable, callback, user_data);
1539   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1540   op->flags = flags;
1541   op->source = g_object_ref (source);
1542
1543   /* TODO: In the case where both source and destintion have
1544      non-threadbased async calls we can use a true async copy here */
1545   
1546   g_task_run_in_thread (task, splice_async_thread);
1547   g_object_unref (task);
1548 }
1549
1550 static gssize
1551 g_output_stream_real_splice_finish (GOutputStream  *stream,
1552                                     GAsyncResult   *result,
1553                                     GError        **error)
1554 {
1555   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1556
1557   return g_task_propagate_int (G_TASK (result), error);
1558 }
1559
1560
1561 static void
1562 flush_async_thread (GTask        *task,
1563                     gpointer      source_object,
1564                     gpointer      task_data,
1565                     GCancellable *cancellable)
1566 {
1567   GOutputStream *stream = source_object;
1568   GOutputStreamClass *class;
1569   gboolean result;
1570   GError *error = NULL;
1571
1572   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1573   result = TRUE;
1574   if (class->flush)
1575     result = class->flush (stream, cancellable, &error);
1576
1577   if (result)
1578     g_task_return_boolean (task, TRUE);
1579   else
1580     g_task_return_error (task, error);
1581 }
1582
1583 static void
1584 g_output_stream_real_flush_async (GOutputStream       *stream,
1585                                   int                  io_priority,
1586                                   GCancellable        *cancellable,
1587                                   GAsyncReadyCallback  callback,
1588                                   gpointer             user_data)
1589 {
1590   GTask *task;
1591
1592   task = g_task_new (stream, cancellable, callback, user_data);
1593   g_task_set_priority (task, io_priority);
1594   g_task_run_in_thread (task, flush_async_thread);
1595   g_object_unref (task);
1596 }
1597
1598 static gboolean
1599 g_output_stream_real_flush_finish (GOutputStream  *stream,
1600                                    GAsyncResult   *result,
1601                                    GError        **error)
1602 {
1603   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1604
1605   return g_task_propagate_boolean (G_TASK (result), error);
1606 }
1607
1608 static void
1609 close_async_thread (GTask        *task,
1610                     gpointer      source_object,
1611                     gpointer      task_data,
1612                     GCancellable *cancellable)
1613 {
1614   GOutputStream *stream = source_object;
1615   GOutputStreamClass *class;
1616   GError *error = NULL;
1617   gboolean result = TRUE;
1618
1619   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1620
1621   /* Do a flush here if there is a flush function, and we did not have to do
1622    * an async flush before (see g_output_stream_close_async)
1623    */
1624   if (class->flush != NULL &&
1625       (class->flush_async == NULL ||
1626        class->flush_async == g_output_stream_real_flush_async))
1627     {
1628       result = class->flush (stream, cancellable, &error);
1629     }
1630
1631   /* Auto handling of cancelation disabled, and ignore
1632      cancellation, since we want to close things anyway, although
1633      possibly in a quick-n-dirty way. At least we never want to leak
1634      open handles */
1635
1636   if (class->close_fn)
1637     {
1638       /* Make sure to close, even if the flush failed (see sync close) */
1639       if (!result)
1640         class->close_fn (stream, cancellable, NULL);
1641       else
1642         result = class->close_fn (stream, cancellable, &error);
1643     }
1644
1645   if (result)
1646     g_task_return_boolean (task, TRUE);
1647   else
1648     g_task_return_error (task, error);
1649 }
1650
1651 static void
1652 g_output_stream_real_close_async (GOutputStream       *stream,
1653                                   int                  io_priority,
1654                                   GCancellable        *cancellable,
1655                                   GAsyncReadyCallback  callback,
1656                                   gpointer             user_data)
1657 {
1658   GTask *task;
1659
1660   task = g_task_new (stream, cancellable, callback, user_data);
1661   g_task_set_priority (task, io_priority);
1662   g_task_run_in_thread (task, close_async_thread);
1663   g_object_unref (task);
1664 }
1665
1666 static gboolean
1667 g_output_stream_real_close_finish (GOutputStream  *stream,
1668                                    GAsyncResult   *result,
1669                                    GError        **error)
1670 {
1671   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1672
1673   return g_task_propagate_boolean (G_TASK (result), error);
1674 }