GMemoryInputStream: fix skip_async()
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gtask.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31 #include "gpollableoutputstream.h"
32
33 /**
34  * SECTION:goutputstream
35  * @short_description: Base class for implementing streaming output
36  * @include: gio/gio.h
37  *
38  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
39  * to close a stream (g_output_stream_close()) and to flush pending writes
40  * (g_output_stream_flush()). 
41  *
42  * To copy the content of an input stream to an output stream without 
43  * manually handling the reads and writes, use g_output_stream_splice(). 
44  *
45  * All of these functions have async variants too.
46  **/
47
48 G_DEFINE_ABSTRACT_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
49
50 struct _GOutputStreamPrivate {
51   guint closed : 1;
52   guint pending : 1;
53   guint closing : 1;
54   GAsyncReadyCallback outstanding_callback;
55 };
56
57 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
58                                                     GInputStream              *source,
59                                                     GOutputStreamSpliceFlags   flags,
60                                                     GCancellable              *cancellable,
61                                                     GError                   **error);
62 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
63                                                     const void                *buffer,
64                                                     gsize                      count,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
73                                                     GInputStream              *source,
74                                                     GOutputStreamSpliceFlags   flags,
75                                                     int                        io_priority,
76                                                     GCancellable              *cancellable,
77                                                     GAsyncReadyCallback        callback,
78                                                     gpointer                   data);
79 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
80                                                     GAsyncResult              *result,
81                                                     GError                   **error);
82 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
83                                                     int                        io_priority,
84                                                     GCancellable              *cancellable,
85                                                     GAsyncReadyCallback        callback,
86                                                     gpointer                   data);
87 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
88                                                     GAsyncResult              *result,
89                                                     GError                   **error);
90 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
91                                                     int                        io_priority,
92                                                     GCancellable              *cancellable,
93                                                     GAsyncReadyCallback        callback,
94                                                     gpointer                   data);
95 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
96                                                     GAsyncResult              *result,
97                                                     GError                   **error);
98 static gboolean _g_output_stream_close_internal    (GOutputStream             *stream,
99                                                     GCancellable              *cancellable,
100                                                     GError                   **error);
101
102 static void
103 g_output_stream_finalize (GObject *object)
104 {
105   G_OBJECT_CLASS (g_output_stream_parent_class)->finalize (object);
106 }
107
108 static void
109 g_output_stream_dispose (GObject *object)
110 {
111   GOutputStream *stream;
112
113   stream = G_OUTPUT_STREAM (object);
114   
115   if (!stream->priv->closed)
116     g_output_stream_close (stream, NULL, NULL);
117
118   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
119 }
120
121 static void
122 g_output_stream_class_init (GOutputStreamClass *klass)
123 {
124   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
125   
126   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
127   
128   gobject_class->finalize = g_output_stream_finalize;
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
147                                               G_TYPE_OUTPUT_STREAM,
148                                               GOutputStreamPrivate);
149 }
150
151 /**
152  * g_output_stream_write:
153  * @stream: a #GOutputStream.
154  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
155  * @count: the number of bytes to write
156  * @cancellable: (allow-none): optional cancellable object
157  * @error: location to store the error occurring, or %NULL to ignore
158  *
159  * Tries to write @count bytes from @buffer into the stream. Will block
160  * during the operation.
161  * 
162  * If count is 0, returns 0 and does nothing. A value of @count
163  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
164  *
165  * On success, the number of bytes written to the stream is returned.
166  * It is not an error if this is not the same as the requested size, as it
167  * can happen e.g. on a partial I/O error, or if there is not enough
168  * storage in the stream. All writes block until at least one byte
169  * is written or an error occurs; 0 is never returned (unless
170  * @count is 0).
171  * 
172  * If @cancellable is not %NULL, then the operation can be cancelled by
173  * triggering the cancellable object from another thread. If the operation
174  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
175  * operation was partially finished when the operation was cancelled the
176  * partial result will be returned, without an error.
177  *
178  * On error -1 is returned and @error is set accordingly.
179  * 
180  * Virtual: write_fn
181  *
182  * Return value: Number of bytes written, or -1 on error
183  **/
184 gssize
185 g_output_stream_write (GOutputStream  *stream,
186                        const void     *buffer,
187                        gsize           count,
188                        GCancellable   *cancellable,
189                        GError        **error)
190 {
191   GOutputStreamClass *class;
192   gssize res;
193
194   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
195   g_return_val_if_fail (buffer != NULL, 0);
196
197   if (count == 0)
198     return 0;
199   
200   if (((gssize) count) < 0)
201     {
202       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
203                    _("Too large count value passed to %s"), G_STRFUNC);
204       return -1;
205     }
206
207   class = G_OUTPUT_STREAM_GET_CLASS (stream);
208
209   if (class->write_fn == NULL) 
210     {
211       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
212                            _("Output stream doesn't implement write"));
213       return -1;
214     }
215   
216   if (!g_output_stream_set_pending (stream, error))
217     return -1;
218   
219   if (cancellable)
220     g_cancellable_push_current (cancellable);
221   
222   res = class->write_fn (stream, buffer, count, cancellable, error);
223   
224   if (cancellable)
225     g_cancellable_pop_current (cancellable);
226   
227   g_output_stream_clear_pending (stream);
228
229   return res; 
230 }
231
232 /**
233  * g_output_stream_write_all:
234  * @stream: a #GOutputStream.
235  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
236  * @count: the number of bytes to write
237  * @bytes_written: (out): location to store the number of bytes that was 
238  *     written to the stream
239  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
240  * @error: location to store the error occurring, or %NULL to ignore
241  *
242  * Tries to write @count bytes from @buffer into the stream. Will block
243  * during the operation.
244  * 
245  * This function is similar to g_output_stream_write(), except it tries to
246  * write as many bytes as requested, only stopping on an error.
247  *
248  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
249  * is set to @count.
250  * 
251  * If there is an error during the operation %FALSE is returned and @error
252  * is set to indicate the error status, @bytes_written is updated to contain
253  * the number of bytes written into the stream before the error occurred.
254  *
255  * Return value: %TRUE on success, %FALSE if there was an error
256  **/
257 gboolean
258 g_output_stream_write_all (GOutputStream  *stream,
259                            const void     *buffer,
260                            gsize           count,
261                            gsize          *bytes_written,
262                            GCancellable   *cancellable,
263                            GError        **error)
264 {
265   gsize _bytes_written;
266   gssize res;
267
268   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
269   g_return_val_if_fail (buffer != NULL, FALSE);
270
271   _bytes_written = 0;
272   while (_bytes_written < count)
273     {
274       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
275                                    cancellable, error);
276       if (res == -1)
277         {
278           if (bytes_written)
279             *bytes_written = _bytes_written;
280           return FALSE;
281         }
282       
283       if (res == 0)
284         g_warning ("Write returned zero without error");
285
286       _bytes_written += res;
287     }
288   
289   if (bytes_written)
290     *bytes_written = _bytes_written;
291
292   return TRUE;
293 }
294
295 /**
296  * g_output_stream_write_bytes:
297  * @stream: a #GOutputStream.
298  * @bytes: the #GBytes to write
299  * @cancellable: (allow-none): optional cancellable object
300  * @error: location to store the error occurring, or %NULL to ignore
301  *
302  * Tries to write the data from @bytes into the stream. Will block
303  * during the operation.
304  *
305  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
306  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
307  *
308  * On success, the number of bytes written to the stream is returned.
309  * It is not an error if this is not the same as the requested size, as it
310  * can happen e.g. on a partial I/O error, or if there is not enough
311  * storage in the stream. All writes block until at least one byte
312  * is written or an error occurs; 0 is never returned (unless
313  * the size of @bytes is 0).
314  *
315  * If @cancellable is not %NULL, then the operation can be cancelled by
316  * triggering the cancellable object from another thread. If the operation
317  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
318  * operation was partially finished when the operation was cancelled the
319  * partial result will be returned, without an error.
320  *
321  * On error -1 is returned and @error is set accordingly.
322  *
323  * Return value: Number of bytes written, or -1 on error
324  **/
325 gssize
326 g_output_stream_write_bytes (GOutputStream  *stream,
327                              GBytes         *bytes,
328                              GCancellable   *cancellable,
329                              GError        **error)
330 {
331   gsize size;
332   gconstpointer data;
333
334   data = g_bytes_get_data (bytes, &size);
335
336   return g_output_stream_write (stream,
337                                 data, size,
338                                 cancellable,
339                                 error);
340 }
341
342 /**
343  * g_output_stream_flush:
344  * @stream: a #GOutputStream.
345  * @cancellable: (allow-none): optional cancellable object
346  * @error: location to store the error occurring, or %NULL to ignore
347  *
348  * Forces a write of all user-space buffered data for the given
349  * @stream. Will block during the operation. Closing the stream will
350  * implicitly cause a flush.
351  *
352  * This function is optional for inherited classes.
353  * 
354  * If @cancellable is not %NULL, then the operation can be cancelled by
355  * triggering the cancellable object from another thread. If the operation
356  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
357  *
358  * Return value: %TRUE on success, %FALSE on error
359  **/
360 gboolean
361 g_output_stream_flush (GOutputStream  *stream,
362                        GCancellable   *cancellable,
363                        GError        **error)
364 {
365   GOutputStreamClass *class;
366   gboolean res;
367
368   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
369
370   if (!g_output_stream_set_pending (stream, error))
371     return FALSE;
372   
373   class = G_OUTPUT_STREAM_GET_CLASS (stream);
374
375   res = TRUE;
376   if (class->flush)
377     {
378       if (cancellable)
379         g_cancellable_push_current (cancellable);
380       
381       res = class->flush (stream, cancellable, error);
382       
383       if (cancellable)
384         g_cancellable_pop_current (cancellable);
385     }
386   
387   g_output_stream_clear_pending (stream);
388
389   return res;
390 }
391
392 /**
393  * g_output_stream_splice:
394  * @stream: a #GOutputStream.
395  * @source: a #GInputStream.
396  * @flags: a set of #GOutputStreamSpliceFlags.
397  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
398  * @error: a #GError location to store the error occurring, or %NULL to
399  * ignore.
400  *
401  * Splices an input stream into an output stream.
402  *
403  * Returns: a #gssize containing the size of the data spliced, or
404  *     -1 if an error occurred. Note that if the number of bytes
405  *     spliced is greater than %G_MAXSSIZE, then that will be
406  *     returned, and there is no way to determine the actual number
407  *     of bytes spliced.
408  **/
409 gssize
410 g_output_stream_splice (GOutputStream             *stream,
411                         GInputStream              *source,
412                         GOutputStreamSpliceFlags   flags,
413                         GCancellable              *cancellable,
414                         GError                   **error)
415 {
416   GOutputStreamClass *class;
417   gssize bytes_copied;
418
419   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
420   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
421
422   if (g_input_stream_is_closed (source))
423     {
424       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
425                            _("Source stream is already closed"));
426       return -1;
427     }
428
429   if (!g_output_stream_set_pending (stream, error))
430     return -1;
431
432   class = G_OUTPUT_STREAM_GET_CLASS (stream);
433
434   if (cancellable)
435     g_cancellable_push_current (cancellable);
436
437   bytes_copied = class->splice (stream, source, flags, cancellable, error);
438
439   if (cancellable)
440     g_cancellable_pop_current (cancellable);
441
442   g_output_stream_clear_pending (stream);
443
444   return bytes_copied;
445 }
446
447 static gssize
448 g_output_stream_real_splice (GOutputStream             *stream,
449                              GInputStream              *source,
450                              GOutputStreamSpliceFlags   flags,
451                              GCancellable              *cancellable,
452                              GError                   **error)
453 {
454   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
455   gssize n_read, n_written;
456   gsize bytes_copied;
457   char buffer[8192], *p;
458   gboolean res;
459
460   bytes_copied = 0;
461   if (class->write_fn == NULL)
462     {
463       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
464                            _("Output stream doesn't implement write"));
465       res = FALSE;
466       goto notsupported;
467     }
468
469   res = TRUE;
470   do
471     {
472       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
473       if (n_read == -1)
474         {
475           res = FALSE;
476           break;
477         }
478
479       if (n_read == 0)
480         break;
481
482       p = buffer;
483       while (n_read > 0)
484         {
485           n_written = class->write_fn (stream, p, n_read, cancellable, error);
486           if (n_written == -1)
487             {
488               res = FALSE;
489               break;
490             }
491
492           p += n_written;
493           n_read -= n_written;
494           bytes_copied += n_written;
495         }
496
497       if (bytes_copied > G_MAXSSIZE)
498         bytes_copied = G_MAXSSIZE;
499     }
500   while (res);
501
502  notsupported:
503   if (!res)
504     error = NULL; /* Ignore further errors */
505
506   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
507     {
508       /* Don't care about errors in source here */
509       g_input_stream_close (source, cancellable, NULL);
510     }
511
512   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
513     {
514       /* But write errors on close are bad! */
515       res = _g_output_stream_close_internal (stream, cancellable, error);
516     }
517
518   if (res)
519     return bytes_copied;
520
521   return -1;
522 }
523
524 /* Must always be called inside
525  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
526 static gboolean
527 _g_output_stream_close_internal (GOutputStream  *stream,
528                                  GCancellable   *cancellable,
529                                  GError        **error)
530 {
531   GOutputStreamClass *class;
532   gboolean res;
533
534   if (stream->priv->closed)
535     return TRUE;
536
537   class = G_OUTPUT_STREAM_GET_CLASS (stream);
538
539   stream->priv->closing = TRUE;
540
541   if (cancellable)
542     g_cancellable_push_current (cancellable);
543
544   if (class->flush)
545     res = class->flush (stream, cancellable, error);
546   else
547     res = TRUE;
548
549   if (!res)
550     {
551       /* flushing caused the error that we want to return,
552        * but we still want to close the underlying stream if possible
553        */
554       if (class->close_fn)
555         class->close_fn (stream, cancellable, NULL);
556     }
557   else
558     {
559       res = TRUE;
560       if (class->close_fn)
561         res = class->close_fn (stream, cancellable, error);
562     }
563
564   if (cancellable)
565     g_cancellable_pop_current (cancellable);
566
567   stream->priv->closing = FALSE;
568   stream->priv->closed = TRUE;
569
570   return res;
571 }
572
573 /**
574  * g_output_stream_close:
575  * @stream: A #GOutputStream.
576  * @cancellable: (allow-none): optional cancellable object
577  * @error: location to store the error occurring, or %NULL to ignore
578  *
579  * Closes the stream, releasing resources related to it.
580  *
581  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
582  * Closing a stream multiple times will not return an error.
583  *
584  * Closing a stream will automatically flush any outstanding buffers in the
585  * stream.
586  *
587  * Streams will be automatically closed when the last reference
588  * is dropped, but you might want to call this function to make sure 
589  * resources are released as early as possible.
590  *
591  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
592  * open after the stream is closed. See the documentation for the individual
593  * stream for details.
594  *
595  * On failure the first error that happened will be reported, but the close
596  * operation will finish as much as possible. A stream that failed to
597  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
598  * is important to check and report the error to the user, otherwise
599  * there might be a loss of data as all data might not be written.
600  * 
601  * If @cancellable is not %NULL, then the operation can be cancelled by
602  * triggering the cancellable object from another thread. If the operation
603  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
604  * Cancelling a close will still leave the stream closed, but there some streams
605  * can use a faster close that doesn't block to e.g. check errors. On
606  * cancellation (as with any error) there is no guarantee that all written
607  * data will reach the target. 
608  *
609  * Return value: %TRUE on success, %FALSE on failure
610  **/
611 gboolean
612 g_output_stream_close (GOutputStream  *stream,
613                        GCancellable   *cancellable,
614                        GError        **error)
615 {
616   gboolean res;
617
618   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
619
620   if (stream->priv->closed)
621     return TRUE;
622
623   if (!g_output_stream_set_pending (stream, error))
624     return FALSE;
625
626   res = _g_output_stream_close_internal (stream, cancellable, error);
627
628   g_output_stream_clear_pending (stream);
629   
630   return res;
631 }
632
633 static void
634 async_ready_write_callback_wrapper (GObject      *source_object,
635                                     GAsyncResult *res,
636                                     gpointer      user_data)
637 {
638   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
639   GOutputStreamClass *class;
640   GTask *task = user_data;
641   gssize nwrote;
642   GError *error = NULL;
643
644   g_output_stream_clear_pending (stream);
645   
646   if (g_async_result_legacy_propagate_error (res, &error))
647     nwrote = -1;
648   else
649     {
650       class = G_OUTPUT_STREAM_GET_CLASS (stream);
651       nwrote = class->write_finish (stream, res, &error);
652     }
653
654   if (nwrote >= 0)
655     g_task_return_int (task, nwrote);
656   else
657     g_task_return_error (task, error);
658   g_object_unref (task);
659 }
660
661 /**
662  * g_output_stream_write_async:
663  * @stream: A #GOutputStream.
664  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
665  * @count: the number of bytes to write
666  * @io_priority: the io priority of the request.
667  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
668  * @callback: (scope async): callback to call when the request is satisfied
669  * @user_data: (closure): the data to pass to callback function
670  *
671  * Request an asynchronous write of @count bytes from @buffer into 
672  * the stream. When the operation is finished @callback will be called.
673  * You can then call g_output_stream_write_finish() to get the result of the 
674  * operation.
675  *
676  * During an async request no other sync and async calls are allowed, 
677  * and will result in %G_IO_ERROR_PENDING errors. 
678  *
679  * A value of @count larger than %G_MAXSSIZE will cause a 
680  * %G_IO_ERROR_INVALID_ARGUMENT error.
681  *
682  * On success, the number of bytes written will be passed to the
683  * @callback. It is not an error if this is not the same as the 
684  * requested size, as it can happen e.g. on a partial I/O error, 
685  * but generally we try to write as many bytes as requested. 
686  *
687  * You are guaranteed that this method will never fail with
688  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
689  * method will just wait until this changes.
690  *
691  * Any outstanding I/O request with higher priority (lower numerical 
692  * value) will be executed before an outstanding request with lower 
693  * priority. Default priority is %G_PRIORITY_DEFAULT.
694  *
695  * The asyncronous methods have a default fallback that uses threads 
696  * to implement asynchronicity, so they are optional for inheriting 
697  * classes. However, if you override one you must override all.
698  *
699  * For the synchronous, blocking version of this function, see 
700  * g_output_stream_write().
701  **/
702 void
703 g_output_stream_write_async (GOutputStream       *stream,
704                              const void          *buffer,
705                              gsize                count,
706                              int                  io_priority,
707                              GCancellable        *cancellable,
708                              GAsyncReadyCallback  callback,
709                              gpointer             user_data)
710 {
711   GOutputStreamClass *class;
712   GError *error = NULL;
713   GTask *task;
714
715   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
716   g_return_if_fail (buffer != NULL);
717
718   task = g_task_new (stream, cancellable, callback, user_data);
719   g_task_set_source_tag (task, g_output_stream_write_async);
720   g_task_set_priority (task, io_priority);
721
722   if (count == 0)
723     {
724       g_task_return_int (task, 0);
725       g_object_unref (task);
726       return;
727     }
728
729   if (((gssize) count) < 0)
730     {
731       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
732                                _("Too large count value passed to %s"),
733                                G_STRFUNC);
734       g_object_unref (task);
735       return;
736     }
737
738   if (!g_output_stream_set_pending (stream, &error))
739     {
740       g_task_return_error (task, error);
741       g_object_unref (task);
742       return;
743     }
744   
745   class = G_OUTPUT_STREAM_GET_CLASS (stream);
746
747   class->write_async (stream, buffer, count, io_priority, cancellable,
748                       async_ready_write_callback_wrapper, task);
749 }
750
751 /**
752  * g_output_stream_write_finish:
753  * @stream: a #GOutputStream.
754  * @result: a #GAsyncResult.
755  * @error: a #GError location to store the error occurring, or %NULL to 
756  * ignore.
757  * 
758  * Finishes a stream write operation.
759  * 
760  * Returns: a #gssize containing the number of bytes written to the stream.
761  **/
762 gssize
763 g_output_stream_write_finish (GOutputStream  *stream,
764                               GAsyncResult   *result,
765                               GError        **error)
766 {
767   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
768   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
769   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
770
771   /* @result is always the GTask created by g_output_stream_write_async();
772    * we called class->write_finish() from async_ready_write_callback_wrapper.
773    */
774   return g_task_propagate_int (G_TASK (result), error);
775 }
776
777 static void
778 write_bytes_callback (GObject      *stream,
779                       GAsyncResult *result,
780                       gpointer      user_data)
781 {
782   GTask *task = user_data;
783   GError *error = NULL;
784   gssize nwrote;
785
786   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
787                                          result, &error);
788   if (nwrote == -1)
789     g_task_return_error (task, error);
790   else
791     g_task_return_int (task, nwrote);
792   g_object_unref (task);
793 }
794
795 /**
796  * g_output_stream_write_bytes_async:
797  * @stream: A #GOutputStream.
798  * @bytes: The bytes to write
799  * @io_priority: the io priority of the request.
800  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
801  * @callback: (scope async): callback to call when the request is satisfied
802  * @user_data: (closure): the data to pass to callback function
803  *
804  * Request an asynchronous write of the data in @bytes to the stream.
805  * When the operation is finished @callback will be called. You can
806  * then call g_output_stream_write_bytes_finish() to get the result of
807  * the operation.
808  *
809  * During an async request no other sync and async calls are allowed,
810  * and will result in %G_IO_ERROR_PENDING errors.
811  *
812  * A #GBytes larger than %G_MAXSSIZE will cause a
813  * %G_IO_ERROR_INVALID_ARGUMENT error.
814  *
815  * On success, the number of bytes written will be passed to the
816  * @callback. It is not an error if this is not the same as the
817  * requested size, as it can happen e.g. on a partial I/O error,
818  * but generally we try to write as many bytes as requested.
819  *
820  * You are guaranteed that this method will never fail with
821  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
822  * method will just wait until this changes.
823  *
824  * Any outstanding I/O request with higher priority (lower numerical
825  * value) will be executed before an outstanding request with lower
826  * priority. Default priority is %G_PRIORITY_DEFAULT.
827  *
828  * For the synchronous, blocking version of this function, see
829  * g_output_stream_write_bytes().
830  **/
831 void
832 g_output_stream_write_bytes_async (GOutputStream       *stream,
833                                    GBytes              *bytes,
834                                    int                  io_priority,
835                                    GCancellable        *cancellable,
836                                    GAsyncReadyCallback  callback,
837                                    gpointer             user_data)
838 {
839   GTask *task;
840   gsize size;
841   gconstpointer data;
842
843   data = g_bytes_get_data (bytes, &size);
844
845   task = g_task_new (stream, cancellable, callback, user_data);
846   g_task_set_task_data (task, g_bytes_ref (bytes),
847                         (GDestroyNotify) g_bytes_unref);
848
849   g_output_stream_write_async (stream,
850                                data, size,
851                                io_priority,
852                                cancellable,
853                                write_bytes_callback,
854                                task);
855 }
856
857 /**
858  * g_output_stream_write_bytes_finish:
859  * @stream: a #GOutputStream.
860  * @result: a #GAsyncResult.
861  * @error: a #GError location to store the error occurring, or %NULL to
862  * ignore.
863  *
864  * Finishes a stream write-from-#GBytes operation.
865  *
866  * Returns: a #gssize containing the number of bytes written to the stream.
867  **/
868 gssize
869 g_output_stream_write_bytes_finish (GOutputStream  *stream,
870                                     GAsyncResult   *result,
871                                     GError        **error)
872 {
873   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
874   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
875
876   return g_task_propagate_int (G_TASK (result), error);
877 }
878
879 typedef struct {
880   GInputStream *source;
881   gpointer user_data;
882   GAsyncReadyCallback callback;
883 } SpliceUserData;
884
885 static void
886 async_ready_splice_callback_wrapper (GObject      *source_object,
887                                      GAsyncResult *res,
888                                      gpointer     _data)
889 {
890   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
891   GOutputStreamClass *class;
892   GTask *task = _data;
893   gssize nspliced;
894   GError *error = NULL;
895
896   g_output_stream_clear_pending (stream);
897   
898   if (g_async_result_legacy_propagate_error (res, &error))
899     nspliced = -1;
900   else
901     {
902       class = G_OUTPUT_STREAM_GET_CLASS (stream);
903       nspliced = class->splice_finish (stream, res, &error);
904     }
905
906   if (nspliced >= 0)
907     g_task_return_int (task, nspliced);
908   else
909     g_task_return_error (task, error);
910   g_object_unref (task);
911 }
912
913 /**
914  * g_output_stream_splice_async:
915  * @stream: a #GOutputStream.
916  * @source: a #GInputStream. 
917  * @flags: a set of #GOutputStreamSpliceFlags.
918  * @io_priority: the io priority of the request.
919  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
920  * @callback: (scope async): a #GAsyncReadyCallback. 
921  * @user_data: (closure): user data passed to @callback.
922  * 
923  * Splices a stream asynchronously.
924  * When the operation is finished @callback will be called.
925  * You can then call g_output_stream_splice_finish() to get the 
926  * result of the operation.
927  *
928  * For the synchronous, blocking version of this function, see 
929  * g_output_stream_splice().
930  **/
931 void
932 g_output_stream_splice_async (GOutputStream            *stream,
933                               GInputStream             *source,
934                               GOutputStreamSpliceFlags  flags,
935                               int                       io_priority,
936                               GCancellable             *cancellable,
937                               GAsyncReadyCallback       callback,
938                               gpointer                  user_data)
939 {
940   GOutputStreamClass *class;
941   GTask *task;
942   GError *error = NULL;
943
944   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
945   g_return_if_fail (G_IS_INPUT_STREAM (source));
946
947   task = g_task_new (stream, cancellable, callback, user_data);
948   g_task_set_source_tag (task, g_output_stream_splice_async);
949   g_task_set_priority (task, io_priority);
950   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
951
952   if (g_input_stream_is_closed (source))
953     {
954       g_task_return_new_error (task,
955                                G_IO_ERROR, G_IO_ERROR_CLOSED,
956                                _("Source stream is already closed"));
957       g_object_unref (task);
958       return;
959     }
960   
961   if (!g_output_stream_set_pending (stream, &error))
962     {
963       g_task_return_error (task, error);
964       g_object_unref (task);
965       return;
966     }
967
968   class = G_OUTPUT_STREAM_GET_CLASS (stream);
969
970   class->splice_async (stream, source, flags, io_priority, cancellable,
971                        async_ready_splice_callback_wrapper, task);
972 }
973
974 /**
975  * g_output_stream_splice_finish:
976  * @stream: a #GOutputStream.
977  * @result: a #GAsyncResult.
978  * @error: a #GError location to store the error occurring, or %NULL to 
979  * ignore.
980  *
981  * Finishes an asynchronous stream splice operation.
982  * 
983  * Returns: a #gssize of the number of bytes spliced. Note that if the
984  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
985  *     will be returned, and there is no way to determine the actual
986  *     number of bytes spliced.
987  **/
988 gssize
989 g_output_stream_splice_finish (GOutputStream  *stream,
990                                GAsyncResult   *result,
991                                GError        **error)
992 {
993   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
994   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
995   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
996
997   /* @result is always the GTask created by g_output_stream_splice_async();
998    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
999    */
1000   return g_task_propagate_int (G_TASK (result), error);
1001 }
1002
1003 static void
1004 async_ready_flush_callback_wrapper (GObject      *source_object,
1005                                     GAsyncResult *res,
1006                                     gpointer      user_data)
1007 {
1008   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1009   GOutputStreamClass *class;
1010   GTask *task = user_data;
1011   gboolean flushed;
1012   GError *error = NULL;
1013
1014   g_output_stream_clear_pending (stream);
1015   
1016   if (g_async_result_legacy_propagate_error (res, &error))
1017     flushed = FALSE;
1018   else
1019     {
1020       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1021       flushed = class->flush_finish (stream, res, &error);
1022     }
1023
1024   if (flushed)
1025     g_task_return_boolean (task, TRUE);
1026   else
1027     g_task_return_error (task, error);
1028   g_object_unref (task);
1029 }
1030
1031 /**
1032  * g_output_stream_flush_async:
1033  * @stream: a #GOutputStream.
1034  * @io_priority: the io priority of the request.
1035  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1036  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1037  * @user_data: (closure): the data to pass to callback function
1038  * 
1039  * Forces an asynchronous write of all user-space buffered data for
1040  * the given @stream.
1041  * For behaviour details see g_output_stream_flush().
1042  *
1043  * When the operation is finished @callback will be 
1044  * called. You can then call g_output_stream_flush_finish() to get the 
1045  * result of the operation.
1046  **/
1047 void
1048 g_output_stream_flush_async (GOutputStream       *stream,
1049                              int                  io_priority,
1050                              GCancellable        *cancellable,
1051                              GAsyncReadyCallback  callback,
1052                              gpointer             user_data)
1053 {
1054   GOutputStreamClass *class;
1055   GTask *task;
1056   GError *error = NULL;
1057
1058   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1059
1060   task = g_task_new (stream, cancellable, callback, user_data);
1061   g_task_set_source_tag (task, g_output_stream_flush_async);
1062   g_task_set_priority (task, io_priority);
1063
1064   if (!g_output_stream_set_pending (stream, &error))
1065     {
1066       g_task_return_error (task, error);
1067       g_object_unref (task);
1068       return;
1069     }
1070
1071   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1072   
1073   if (class->flush_async == NULL)
1074     {
1075       g_task_return_boolean (task, TRUE);
1076       g_object_unref (task);
1077       return;
1078     }
1079       
1080   class->flush_async (stream, io_priority, cancellable,
1081                       async_ready_flush_callback_wrapper, task);
1082 }
1083
1084 /**
1085  * g_output_stream_flush_finish:
1086  * @stream: a #GOutputStream.
1087  * @result: a GAsyncResult.
1088  * @error: a #GError location to store the error occurring, or %NULL to 
1089  * ignore.
1090  * 
1091  * Finishes flushing an output stream.
1092  * 
1093  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1094  **/
1095 gboolean
1096 g_output_stream_flush_finish (GOutputStream  *stream,
1097                               GAsyncResult   *result,
1098                               GError        **error)
1099 {
1100   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1101   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1102   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1103
1104   /* @result is always the GTask created by g_output_stream_flush_async();
1105    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1106    */
1107   return g_task_propagate_boolean (G_TASK (result), error);
1108 }
1109
1110
1111 static void
1112 async_ready_close_callback_wrapper (GObject      *source_object,
1113                                     GAsyncResult *res,
1114                                     gpointer      user_data)
1115 {
1116   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1117   GOutputStreamClass *class;
1118   GTask *task = user_data;
1119   GError *error = g_task_get_task_data (task);
1120
1121   stream->priv->closing = FALSE;
1122   stream->priv->closed = TRUE;
1123
1124   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1125     {
1126       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1127
1128       class->close_finish (stream, res,
1129                            error ? NULL : &error);
1130     }
1131
1132   g_output_stream_clear_pending (stream);
1133
1134   if (error != NULL)
1135     g_task_return_error (task, error);
1136   else
1137     g_task_return_boolean (task, TRUE);
1138   g_object_unref (task);
1139 }
1140
1141 static void
1142 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1143                                             GAsyncResult *res,
1144                                             gpointer      user_data)
1145 {
1146   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1147   GOutputStreamClass *class;
1148   GTask *task = user_data;
1149   GError *error = NULL;
1150
1151   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1152
1153   if (!g_async_result_legacy_propagate_error (res, &error))
1154     {
1155       class->flush_finish (stream, res, &error);
1156     }
1157
1158   /* propagate the possible error */
1159   if (error)
1160     g_task_set_task_data (task, error, NULL);
1161
1162   /* we still close, even if there was a flush error */
1163   class->close_async (stream,
1164                       g_task_get_priority (task),
1165                       g_task_get_cancellable (task),
1166                       async_ready_close_callback_wrapper, task);
1167 }
1168
1169 /**
1170  * g_output_stream_close_async:
1171  * @stream: A #GOutputStream.
1172  * @io_priority: the io priority of the request.
1173  * @cancellable: (allow-none): optional cancellable object
1174  * @callback: (scope async): callback to call when the request is satisfied
1175  * @user_data: (closure): the data to pass to callback function
1176  *
1177  * Requests an asynchronous close of the stream, releasing resources 
1178  * related to it. When the operation is finished @callback will be 
1179  * called. You can then call g_output_stream_close_finish() to get 
1180  * the result of the operation.
1181  *
1182  * For behaviour details see g_output_stream_close().
1183  *
1184  * The asyncronous methods have a default fallback that uses threads 
1185  * to implement asynchronicity, so they are optional for inheriting 
1186  * classes. However, if you override one you must override all.
1187  **/
1188 void
1189 g_output_stream_close_async (GOutputStream       *stream,
1190                              int                  io_priority,
1191                              GCancellable        *cancellable,
1192                              GAsyncReadyCallback  callback,
1193                              gpointer             user_data)
1194 {
1195   GOutputStreamClass *class;
1196   GTask *task;
1197   GError *error = NULL;
1198
1199   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1200   
1201   task = g_task_new (stream, cancellable, callback, user_data);
1202   g_task_set_source_tag (task, g_output_stream_close_async);
1203   g_task_set_priority (task, io_priority);
1204
1205   if (stream->priv->closed)
1206     {
1207       g_task_return_boolean (task, TRUE);
1208       g_object_unref (task);
1209       return;
1210     }
1211
1212   if (!g_output_stream_set_pending (stream, &error))
1213     {
1214       g_task_return_error (task, error);
1215       g_object_unref (task);
1216       return;
1217     }
1218   
1219   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1220   stream->priv->closing = TRUE;
1221
1222   /* Call close_async directly if there is no need to flush, or if the flush
1223      can be done sync (in the output stream async close thread) */
1224   if (class->flush_async == NULL ||
1225       (class->flush_async == g_output_stream_real_flush_async &&
1226        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1227     {
1228       class->close_async (stream, io_priority, cancellable,
1229                           async_ready_close_callback_wrapper, task);
1230     }
1231   else
1232     {
1233       /* First do an async flush, then do the async close in the callback
1234          wrapper (see async_ready_close_flushed_callback_wrapper) */
1235       class->flush_async (stream, io_priority, cancellable,
1236                           async_ready_close_flushed_callback_wrapper, task);
1237     }
1238 }
1239
1240 /**
1241  * g_output_stream_close_finish:
1242  * @stream: a #GOutputStream.
1243  * @result: a #GAsyncResult.
1244  * @error: a #GError location to store the error occurring, or %NULL to 
1245  * ignore.
1246  * 
1247  * Closes an output stream.
1248  * 
1249  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1250  **/
1251 gboolean
1252 g_output_stream_close_finish (GOutputStream  *stream,
1253                               GAsyncResult   *result,
1254                               GError        **error)
1255 {
1256   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1257   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1258   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1259
1260   /* @result is always the GTask created by g_output_stream_close_async();
1261    * we called class->close_finish() from async_ready_close_callback_wrapper.
1262    */
1263   return g_task_propagate_boolean (G_TASK (result), error);
1264 }
1265
1266 /**
1267  * g_output_stream_is_closed:
1268  * @stream: a #GOutputStream.
1269  * 
1270  * Checks if an output stream has already been closed.
1271  * 
1272  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1273  **/
1274 gboolean
1275 g_output_stream_is_closed (GOutputStream *stream)
1276 {
1277   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1278   
1279   return stream->priv->closed;
1280 }
1281
1282 /**
1283  * g_output_stream_is_closing:
1284  * @stream: a #GOutputStream.
1285  *
1286  * Checks if an output stream is being closed. This can be
1287  * used inside e.g. a flush implementation to see if the
1288  * flush (or other i/o operation) is called from within
1289  * the closing operation.
1290  *
1291  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1292  *
1293  * Since: 2.24
1294  **/
1295 gboolean
1296 g_output_stream_is_closing (GOutputStream *stream)
1297 {
1298   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1299
1300   return stream->priv->closing;
1301 }
1302
1303 /**
1304  * g_output_stream_has_pending:
1305  * @stream: a #GOutputStream.
1306  * 
1307  * Checks if an ouput stream has pending actions.
1308  * 
1309  * Returns: %TRUE if @stream has pending actions. 
1310  **/
1311 gboolean
1312 g_output_stream_has_pending (GOutputStream *stream)
1313 {
1314   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1315   
1316   return stream->priv->pending;
1317 }
1318
1319 /**
1320  * g_output_stream_set_pending:
1321  * @stream: a #GOutputStream.
1322  * @error: a #GError location to store the error occurring, or %NULL to 
1323  * ignore.
1324  * 
1325  * Sets @stream to have actions pending. If the pending flag is
1326  * already set or @stream is closed, it will return %FALSE and set
1327  * @error.
1328  *
1329  * Return value: %TRUE if pending was previously unset and is now set.
1330  **/
1331 gboolean
1332 g_output_stream_set_pending (GOutputStream *stream,
1333                              GError **error)
1334 {
1335   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1336   
1337   if (stream->priv->closed)
1338     {
1339       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1340                            _("Stream is already closed"));
1341       return FALSE;
1342     }
1343   
1344   if (stream->priv->pending)
1345     {
1346       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1347                            /* Translators: This is an error you get if there is
1348                             * already an operation running against this stream when
1349                             * you try to start one */
1350                            _("Stream has outstanding operation"));
1351       return FALSE;
1352     }
1353   
1354   stream->priv->pending = TRUE;
1355   return TRUE;
1356 }
1357
1358 /**
1359  * g_output_stream_clear_pending:
1360  * @stream: output stream
1361  * 
1362  * Clears the pending flag on @stream.
1363  **/
1364 void
1365 g_output_stream_clear_pending (GOutputStream *stream)
1366 {
1367   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1368   
1369   stream->priv->pending = FALSE;
1370 }
1371
1372
1373 /********************************************
1374  *   Default implementation of async ops    *
1375  ********************************************/
1376
1377 typedef struct {
1378   const void         *buffer;
1379   gsize               count_requested;
1380   gssize              count_written;
1381 } WriteData;
1382
1383 static void
1384 free_write_data (WriteData *op)
1385 {
1386   g_slice_free (WriteData, op);
1387 }
1388
1389 static void
1390 write_async_thread (GTask        *task,
1391                     gpointer      source_object,
1392                     gpointer      task_data,
1393                     GCancellable *cancellable)
1394 {
1395   GOutputStream *stream = source_object;
1396   WriteData *op = task_data;
1397   GOutputStreamClass *class;
1398   GError *error = NULL;
1399   gssize count_written;
1400
1401   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1402   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1403                                    cancellable, &error);
1404   if (count_written == -1)
1405     g_task_return_error (task, error);
1406   else
1407     g_task_return_int (task, count_written);
1408 }
1409
1410 static void write_async_pollable (GPollableOutputStream *stream,
1411                                   GTask                 *task);
1412
1413 static gboolean
1414 write_async_pollable_ready (GPollableOutputStream *stream,
1415                             gpointer               user_data)
1416 {
1417   GTask *task = user_data;
1418
1419   write_async_pollable (stream, task);
1420   return FALSE;
1421 }
1422
1423 static void
1424 write_async_pollable (GPollableOutputStream *stream,
1425                       GTask                 *task)
1426 {
1427   GError *error = NULL;
1428   WriteData *op = g_task_get_task_data (task);
1429   gssize count_written;
1430
1431   if (g_task_return_error_if_cancelled (task))
1432     return;
1433
1434   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1435     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1436
1437   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1438     {
1439       GSource *source;
1440
1441       g_error_free (error);
1442
1443       source = g_pollable_output_stream_create_source (stream,
1444                                                        g_task_get_cancellable (task));
1445       g_task_attach_source (task, source,
1446                             (GSourceFunc) write_async_pollable_ready);
1447       g_source_unref (source);
1448       return;
1449     }
1450
1451   if (count_written == -1)
1452     g_task_return_error (task, error);
1453   else
1454     g_task_return_int (task, count_written);
1455 }
1456
1457 static void
1458 g_output_stream_real_write_async (GOutputStream       *stream,
1459                                   const void          *buffer,
1460                                   gsize                count,
1461                                   int                  io_priority,
1462                                   GCancellable        *cancellable,
1463                                   GAsyncReadyCallback  callback,
1464                                   gpointer             user_data)
1465 {
1466   GTask *task;
1467   WriteData *op;
1468
1469   op = g_slice_new0 (WriteData);
1470   task = g_task_new (stream, cancellable, callback, user_data);
1471   g_task_set_check_cancellable (task, FALSE);
1472   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1473   op->buffer = buffer;
1474   op->count_requested = count;
1475
1476   if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1477       g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
1478     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1479   else
1480     g_task_run_in_thread (task, write_async_thread);
1481   g_object_unref (task);
1482 }
1483
1484 static gssize
1485 g_output_stream_real_write_finish (GOutputStream  *stream,
1486                                    GAsyncResult   *result,
1487                                    GError        **error)
1488 {
1489   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1490
1491   return g_task_propagate_int (G_TASK (result), error);
1492 }
1493
1494 typedef struct {
1495   GInputStream *source;
1496   GOutputStreamSpliceFlags flags;
1497 } SpliceData;
1498
1499 static void
1500 free_splice_data (SpliceData *op)
1501 {
1502   g_object_unref (op->source);
1503   g_free (op);
1504 }
1505
1506 static void
1507 splice_async_thread (GTask        *task,
1508                      gpointer      source_object,
1509                      gpointer      task_data,
1510                      GCancellable *cancellable)
1511 {
1512   GOutputStream *stream = source_object;
1513   SpliceData *op = task_data;
1514   GOutputStreamClass *class;
1515   GError *error = NULL;
1516   gssize bytes_copied;
1517
1518   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1519   
1520   bytes_copied = class->splice (stream,
1521                                 op->source,
1522                                 op->flags,
1523                                 cancellable,
1524                                 &error);
1525   if (bytes_copied == -1)
1526     g_task_return_error (task, error);
1527   else
1528     g_task_return_int (task, bytes_copied);
1529 }
1530
1531 static void
1532 g_output_stream_real_splice_async (GOutputStream             *stream,
1533                                    GInputStream              *source,
1534                                    GOutputStreamSpliceFlags   flags,
1535                                    int                        io_priority,
1536                                    GCancellable              *cancellable,
1537                                    GAsyncReadyCallback        callback,
1538                                    gpointer                   user_data)
1539 {
1540   GTask *task;
1541   SpliceData *op;
1542
1543   op = g_new0 (SpliceData, 1);
1544   task = g_task_new (stream, cancellable, callback, user_data);
1545   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1546   op->flags = flags;
1547   op->source = g_object_ref (source);
1548
1549   /* TODO: In the case where both source and destintion have
1550      non-threadbased async calls we can use a true async copy here */
1551   
1552   g_task_run_in_thread (task, splice_async_thread);
1553   g_object_unref (task);
1554 }
1555
1556 static gssize
1557 g_output_stream_real_splice_finish (GOutputStream  *stream,
1558                                     GAsyncResult   *result,
1559                                     GError        **error)
1560 {
1561   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1562
1563   return g_task_propagate_int (G_TASK (result), error);
1564 }
1565
1566
1567 static void
1568 flush_async_thread (GTask        *task,
1569                     gpointer      source_object,
1570                     gpointer      task_data,
1571                     GCancellable *cancellable)
1572 {
1573   GOutputStream *stream = source_object;
1574   GOutputStreamClass *class;
1575   gboolean result;
1576   GError *error = NULL;
1577
1578   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1579   result = TRUE;
1580   if (class->flush)
1581     result = class->flush (stream, cancellable, &error);
1582
1583   if (result)
1584     g_task_return_boolean (task, TRUE);
1585   else
1586     g_task_return_error (task, error);
1587 }
1588
1589 static void
1590 g_output_stream_real_flush_async (GOutputStream       *stream,
1591                                   int                  io_priority,
1592                                   GCancellable        *cancellable,
1593                                   GAsyncReadyCallback  callback,
1594                                   gpointer             user_data)
1595 {
1596   GTask *task;
1597
1598   task = g_task_new (stream, cancellable, callback, user_data);
1599   g_task_set_priority (task, io_priority);
1600   g_task_run_in_thread (task, flush_async_thread);
1601   g_object_unref (task);
1602 }
1603
1604 static gboolean
1605 g_output_stream_real_flush_finish (GOutputStream  *stream,
1606                                    GAsyncResult   *result,
1607                                    GError        **error)
1608 {
1609   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1610
1611   return g_task_propagate_boolean (G_TASK (result), error);
1612 }
1613
1614 static void
1615 close_async_thread (GTask        *task,
1616                     gpointer      source_object,
1617                     gpointer      task_data,
1618                     GCancellable *cancellable)
1619 {
1620   GOutputStream *stream = source_object;
1621   GOutputStreamClass *class;
1622   GError *error = NULL;
1623   gboolean result = TRUE;
1624
1625   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1626
1627   /* Do a flush here if there is a flush function, and we did not have to do
1628    * an async flush before (see g_output_stream_close_async)
1629    */
1630   if (class->flush != NULL &&
1631       (class->flush_async == NULL ||
1632        class->flush_async == g_output_stream_real_flush_async))
1633     {
1634       result = class->flush (stream, cancellable, &error);
1635     }
1636
1637   /* Auto handling of cancelation disabled, and ignore
1638      cancellation, since we want to close things anyway, although
1639      possibly in a quick-n-dirty way. At least we never want to leak
1640      open handles */
1641
1642   if (class->close_fn)
1643     {
1644       /* Make sure to close, even if the flush failed (see sync close) */
1645       if (!result)
1646         class->close_fn (stream, cancellable, NULL);
1647       else
1648         result = class->close_fn (stream, cancellable, &error);
1649     }
1650
1651   if (result)
1652     g_task_return_boolean (task, TRUE);
1653   else
1654     g_task_return_error (task, error);
1655 }
1656
1657 static void
1658 g_output_stream_real_close_async (GOutputStream       *stream,
1659                                   int                  io_priority,
1660                                   GCancellable        *cancellable,
1661                                   GAsyncReadyCallback  callback,
1662                                   gpointer             user_data)
1663 {
1664   GTask *task;
1665
1666   task = g_task_new (stream, cancellable, callback, user_data);
1667   g_task_set_priority (task, io_priority);
1668   g_task_run_in_thread (task, close_async_thread);
1669   g_object_unref (task);
1670 }
1671
1672 static gboolean
1673 g_output_stream_real_close_finish (GOutputStream  *stream,
1674                                    GAsyncResult   *result,
1675                                    GError        **error)
1676 {
1677   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1678
1679   return g_task_propagate_boolean (G_TASK (result), error);
1680 }