goutputstream: Add clear warning about short writes to _write_bytes() and async version
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gtask.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "gioprivate.h"
31 #include "glibintl.h"
32 #include "gpollableoutputstream.h"
33
34 /**
35  * SECTION:goutputstream
36  * @short_description: Base class for implementing streaming output
37  * @include: gio/gio.h
38  *
39  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
40  * to close a stream (g_output_stream_close()) and to flush pending writes
41  * (g_output_stream_flush()). 
42  *
43  * To copy the content of an input stream to an output stream without 
44  * manually handling the reads and writes, use g_output_stream_splice(). 
45  *
46  * All of these functions have async variants too.
47  **/
48
49 struct _GOutputStreamPrivate {
50   guint closed : 1;
51   guint pending : 1;
52   guint closing : 1;
53   GAsyncReadyCallback outstanding_callback;
54 };
55
56 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
57
58 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
59                                                     GInputStream              *source,
60                                                     GOutputStreamSpliceFlags   flags,
61                                                     GCancellable              *cancellable,
62                                                     GError                   **error);
63 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
64                                                     const void                *buffer,
65                                                     gsize                      count,
66                                                     int                        io_priority,
67                                                     GCancellable              *cancellable,
68                                                     GAsyncReadyCallback        callback,
69                                                     gpointer                   data);
70 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
71                                                     GAsyncResult              *result,
72                                                     GError                   **error);
73 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
74                                                     GInputStream              *source,
75                                                     GOutputStreamSpliceFlags   flags,
76                                                     int                        io_priority,
77                                                     GCancellable              *cancellable,
78                                                     GAsyncReadyCallback        callback,
79                                                     gpointer                   data);
80 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
81                                                     GAsyncResult              *result,
82                                                     GError                   **error);
83 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
84                                                     int                        io_priority,
85                                                     GCancellable              *cancellable,
86                                                     GAsyncReadyCallback        callback,
87                                                     gpointer                   data);
88 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
89                                                     GAsyncResult              *result,
90                                                     GError                   **error);
91 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
92                                                     int                        io_priority,
93                                                     GCancellable              *cancellable,
94                                                     GAsyncReadyCallback        callback,
95                                                     gpointer                   data);
96 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
97                                                     GAsyncResult              *result,
98                                                     GError                   **error);
99 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
100                                                     GCancellable              *cancellable,
101                                                     GError                   **error);
102 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
103                                                       int                      io_priority,
104                                                       GCancellable            *cancellable,
105                                                       GAsyncReadyCallback      callback,
106                                                       gpointer                 data);
107 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
108                                                        GAsyncResult           *result,
109                                                        GError                **error);
110
111 static void
112 g_output_stream_dispose (GObject *object)
113 {
114   GOutputStream *stream;
115
116   stream = G_OUTPUT_STREAM (object);
117   
118   if (!stream->priv->closed)
119     g_output_stream_close (stream, NULL, NULL);
120
121   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
122 }
123
124 static void
125 g_output_stream_class_init (GOutputStreamClass *klass)
126 {
127   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
128
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = g_output_stream_get_instance_private (stream);
147 }
148
149 /**
150  * g_output_stream_write:
151  * @stream: a #GOutputStream.
152  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
153  * @count: the number of bytes to write
154  * @cancellable: (allow-none): optional cancellable object
155  * @error: location to store the error occurring, or %NULL to ignore
156  *
157  * Tries to write @count bytes from @buffer into the stream. Will block
158  * during the operation.
159  * 
160  * If count is 0, returns 0 and does nothing. A value of @count
161  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
162  *
163  * On success, the number of bytes written to the stream is returned.
164  * It is not an error if this is not the same as the requested size, as it
165  * can happen e.g. on a partial I/O error, or if there is not enough
166  * storage in the stream. All writes block until at least one byte
167  * is written or an error occurs; 0 is never returned (unless
168  * @count is 0).
169  * 
170  * If @cancellable is not %NULL, then the operation can be cancelled by
171  * triggering the cancellable object from another thread. If the operation
172  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
173  * operation was partially finished when the operation was cancelled the
174  * partial result will be returned, without an error.
175  *
176  * On error -1 is returned and @error is set accordingly.
177  * 
178  * Virtual: write_fn
179  *
180  * Return value: Number of bytes written, or -1 on error
181  **/
182 gssize
183 g_output_stream_write (GOutputStream  *stream,
184                        const void     *buffer,
185                        gsize           count,
186                        GCancellable   *cancellable,
187                        GError        **error)
188 {
189   GOutputStreamClass *class;
190   gssize res;
191
192   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
193   g_return_val_if_fail (buffer != NULL, 0);
194
195   if (count == 0)
196     return 0;
197   
198   if (((gssize) count) < 0)
199     {
200       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
201                    _("Too large count value passed to %s"), G_STRFUNC);
202       return -1;
203     }
204
205   class = G_OUTPUT_STREAM_GET_CLASS (stream);
206
207   if (class->write_fn == NULL) 
208     {
209       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
210                            _("Output stream doesn't implement write"));
211       return -1;
212     }
213   
214   if (!g_output_stream_set_pending (stream, error))
215     return -1;
216   
217   if (cancellable)
218     g_cancellable_push_current (cancellable);
219   
220   res = class->write_fn (stream, buffer, count, cancellable, error);
221   
222   if (cancellable)
223     g_cancellable_pop_current (cancellable);
224   
225   g_output_stream_clear_pending (stream);
226
227   return res; 
228 }
229
230 /**
231  * g_output_stream_write_all:
232  * @stream: a #GOutputStream.
233  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
234  * @count: the number of bytes to write
235  * @bytes_written: (out): location to store the number of bytes that was 
236  *     written to the stream
237  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
238  * @error: location to store the error occurring, or %NULL to ignore
239  *
240  * Tries to write @count bytes from @buffer into the stream. Will block
241  * during the operation.
242  * 
243  * This function is similar to g_output_stream_write(), except it tries to
244  * write as many bytes as requested, only stopping on an error.
245  *
246  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
247  * is set to @count.
248  * 
249  * If there is an error during the operation %FALSE is returned and @error
250  * is set to indicate the error status, @bytes_written is updated to contain
251  * the number of bytes written into the stream before the error occurred.
252  *
253  * Return value: %TRUE on success, %FALSE if there was an error
254  **/
255 gboolean
256 g_output_stream_write_all (GOutputStream  *stream,
257                            const void     *buffer,
258                            gsize           count,
259                            gsize          *bytes_written,
260                            GCancellable   *cancellable,
261                            GError        **error)
262 {
263   gsize _bytes_written;
264   gssize res;
265
266   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
267   g_return_val_if_fail (buffer != NULL, FALSE);
268
269   _bytes_written = 0;
270   while (_bytes_written < count)
271     {
272       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
273                                    cancellable, error);
274       if (res == -1)
275         {
276           if (bytes_written)
277             *bytes_written = _bytes_written;
278           return FALSE;
279         }
280       
281       if (res == 0)
282         g_warning ("Write returned zero without error");
283
284       _bytes_written += res;
285     }
286   
287   if (bytes_written)
288     *bytes_written = _bytes_written;
289
290   return TRUE;
291 }
292
293 /**
294  * g_output_stream_write_bytes:
295  * @stream: a #GOutputStream.
296  * @bytes: the #GBytes to write
297  * @cancellable: (allow-none): optional cancellable object
298  * @error: location to store the error occurring, or %NULL to ignore
299  *
300  * A wrapper function for g_output_stream_write() which takes a
301  * #GBytes as input.  This can be more convenient for use by language
302  * bindings or in other cases where the refcounted nature of #GBytes
303  * is helpful over a bare pointer interface.
304  *
305  * However, note that this function <emphasis>may</emphasis> still
306  * perform partial writes, just like g_output_stream_write().  If that
307  * occurs, to continue writing, you will need to create a new #GBytes
308  * containing just the remaining bytes, using
309  * g_bytes_new_from_bytes().  Passing the same #GBytes instance
310  * multiple times potentially can result in duplicated data in the
311  * output stream.
312  *
313  * Return value: Number of bytes written, or -1 on error
314  **/
315 gssize
316 g_output_stream_write_bytes (GOutputStream  *stream,
317                              GBytes         *bytes,
318                              GCancellable   *cancellable,
319                              GError        **error)
320 {
321   gsize size;
322   gconstpointer data;
323
324   data = g_bytes_get_data (bytes, &size);
325
326   return g_output_stream_write (stream,
327                                 data, size,
328                                 cancellable,
329                                 error);
330 }
331
332 /**
333  * g_output_stream_flush:
334  * @stream: a #GOutputStream.
335  * @cancellable: (allow-none): optional cancellable object
336  * @error: location to store the error occurring, or %NULL to ignore
337  *
338  * Forces a write of all user-space buffered data for the given
339  * @stream. Will block during the operation. Closing the stream will
340  * implicitly cause a flush.
341  *
342  * This function is optional for inherited classes.
343  * 
344  * If @cancellable is not %NULL, then the operation can be cancelled by
345  * triggering the cancellable object from another thread. If the operation
346  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
347  *
348  * Return value: %TRUE on success, %FALSE on error
349  **/
350 gboolean
351 g_output_stream_flush (GOutputStream  *stream,
352                        GCancellable   *cancellable,
353                        GError        **error)
354 {
355   GOutputStreamClass *class;
356   gboolean res;
357
358   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
359
360   if (!g_output_stream_set_pending (stream, error))
361     return FALSE;
362   
363   class = G_OUTPUT_STREAM_GET_CLASS (stream);
364
365   res = TRUE;
366   if (class->flush)
367     {
368       if (cancellable)
369         g_cancellable_push_current (cancellable);
370       
371       res = class->flush (stream, cancellable, error);
372       
373       if (cancellable)
374         g_cancellable_pop_current (cancellable);
375     }
376   
377   g_output_stream_clear_pending (stream);
378
379   return res;
380 }
381
382 /**
383  * g_output_stream_splice:
384  * @stream: a #GOutputStream.
385  * @source: a #GInputStream.
386  * @flags: a set of #GOutputStreamSpliceFlags.
387  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
388  * @error: a #GError location to store the error occurring, or %NULL to
389  * ignore.
390  *
391  * Splices an input stream into an output stream.
392  *
393  * Returns: a #gssize containing the size of the data spliced, or
394  *     -1 if an error occurred. Note that if the number of bytes
395  *     spliced is greater than %G_MAXSSIZE, then that will be
396  *     returned, and there is no way to determine the actual number
397  *     of bytes spliced.
398  **/
399 gssize
400 g_output_stream_splice (GOutputStream             *stream,
401                         GInputStream              *source,
402                         GOutputStreamSpliceFlags   flags,
403                         GCancellable              *cancellable,
404                         GError                   **error)
405 {
406   GOutputStreamClass *class;
407   gssize bytes_copied;
408
409   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
410   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
411
412   if (g_input_stream_is_closed (source))
413     {
414       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
415                            _("Source stream is already closed"));
416       return -1;
417     }
418
419   if (!g_output_stream_set_pending (stream, error))
420     return -1;
421
422   class = G_OUTPUT_STREAM_GET_CLASS (stream);
423
424   if (cancellable)
425     g_cancellable_push_current (cancellable);
426
427   bytes_copied = class->splice (stream, source, flags, cancellable, error);
428
429   if (cancellable)
430     g_cancellable_pop_current (cancellable);
431
432   g_output_stream_clear_pending (stream);
433
434   return bytes_copied;
435 }
436
437 static gssize
438 g_output_stream_real_splice (GOutputStream             *stream,
439                              GInputStream              *source,
440                              GOutputStreamSpliceFlags   flags,
441                              GCancellable              *cancellable,
442                              GError                   **error)
443 {
444   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
445   gssize n_read, n_written;
446   gsize bytes_copied;
447   char buffer[8192], *p;
448   gboolean res;
449
450   bytes_copied = 0;
451   if (class->write_fn == NULL)
452     {
453       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
454                            _("Output stream doesn't implement write"));
455       res = FALSE;
456       goto notsupported;
457     }
458
459   res = TRUE;
460   do
461     {
462       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
463       if (n_read == -1)
464         {
465           res = FALSE;
466           break;
467         }
468
469       if (n_read == 0)
470         break;
471
472       p = buffer;
473       while (n_read > 0)
474         {
475           n_written = class->write_fn (stream, p, n_read, cancellable, error);
476           if (n_written == -1)
477             {
478               res = FALSE;
479               break;
480             }
481
482           p += n_written;
483           n_read -= n_written;
484           bytes_copied += n_written;
485         }
486
487       if (bytes_copied > G_MAXSSIZE)
488         bytes_copied = G_MAXSSIZE;
489     }
490   while (res);
491
492  notsupported:
493   if (!res)
494     error = NULL; /* Ignore further errors */
495
496   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
497     {
498       /* Don't care about errors in source here */
499       g_input_stream_close (source, cancellable, NULL);
500     }
501
502   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
503     {
504       /* But write errors on close are bad! */
505       res = g_output_stream_internal_close (stream, cancellable, error);
506     }
507
508   if (res)
509     return bytes_copied;
510
511   return -1;
512 }
513
514 /* Must always be called inside
515  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
516 static gboolean
517 g_output_stream_internal_close (GOutputStream  *stream,
518                                 GCancellable   *cancellable,
519                                 GError        **error)
520 {
521   GOutputStreamClass *class;
522   gboolean res;
523
524   if (stream->priv->closed)
525     return TRUE;
526
527   class = G_OUTPUT_STREAM_GET_CLASS (stream);
528
529   stream->priv->closing = TRUE;
530
531   if (cancellable)
532     g_cancellable_push_current (cancellable);
533
534   if (class->flush)
535     res = class->flush (stream, cancellable, error);
536   else
537     res = TRUE;
538
539   if (!res)
540     {
541       /* flushing caused the error that we want to return,
542        * but we still want to close the underlying stream if possible
543        */
544       if (class->close_fn)
545         class->close_fn (stream, cancellable, NULL);
546     }
547   else
548     {
549       res = TRUE;
550       if (class->close_fn)
551         res = class->close_fn (stream, cancellable, error);
552     }
553
554   if (cancellable)
555     g_cancellable_pop_current (cancellable);
556
557   stream->priv->closing = FALSE;
558   stream->priv->closed = TRUE;
559
560   return res;
561 }
562
563 /**
564  * g_output_stream_close:
565  * @stream: A #GOutputStream.
566  * @cancellable: (allow-none): optional cancellable object
567  * @error: location to store the error occurring, or %NULL to ignore
568  *
569  * Closes the stream, releasing resources related to it.
570  *
571  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
572  * Closing a stream multiple times will not return an error.
573  *
574  * Closing a stream will automatically flush any outstanding buffers in the
575  * stream.
576  *
577  * Streams will be automatically closed when the last reference
578  * is dropped, but you might want to call this function to make sure 
579  * resources are released as early as possible.
580  *
581  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
582  * open after the stream is closed. See the documentation for the individual
583  * stream for details.
584  *
585  * On failure the first error that happened will be reported, but the close
586  * operation will finish as much as possible. A stream that failed to
587  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
588  * is important to check and report the error to the user, otherwise
589  * there might be a loss of data as all data might not be written.
590  * 
591  * If @cancellable is not %NULL, then the operation can be cancelled by
592  * triggering the cancellable object from another thread. If the operation
593  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
594  * Cancelling a close will still leave the stream closed, but there some streams
595  * can use a faster close that doesn't block to e.g. check errors. On
596  * cancellation (as with any error) there is no guarantee that all written
597  * data will reach the target. 
598  *
599  * Return value: %TRUE on success, %FALSE on failure
600  **/
601 gboolean
602 g_output_stream_close (GOutputStream  *stream,
603                        GCancellable   *cancellable,
604                        GError        **error)
605 {
606   gboolean res;
607
608   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
609
610   if (stream->priv->closed)
611     return TRUE;
612
613   if (!g_output_stream_set_pending (stream, error))
614     return FALSE;
615
616   res = g_output_stream_internal_close (stream, cancellable, error);
617
618   g_output_stream_clear_pending (stream);
619   
620   return res;
621 }
622
623 static void
624 async_ready_write_callback_wrapper (GObject      *source_object,
625                                     GAsyncResult *res,
626                                     gpointer      user_data)
627 {
628   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
629   GOutputStreamClass *class;
630   GTask *task = user_data;
631   gssize nwrote;
632   GError *error = NULL;
633
634   g_output_stream_clear_pending (stream);
635   
636   if (g_async_result_legacy_propagate_error (res, &error))
637     nwrote = -1;
638   else
639     {
640       class = G_OUTPUT_STREAM_GET_CLASS (stream);
641       nwrote = class->write_finish (stream, res, &error);
642     }
643
644   if (nwrote >= 0)
645     g_task_return_int (task, nwrote);
646   else
647     g_task_return_error (task, error);
648   g_object_unref (task);
649 }
650
651 /**
652  * g_output_stream_write_async:
653  * @stream: A #GOutputStream.
654  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
655  * @count: the number of bytes to write
656  * @io_priority: the io priority of the request.
657  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
658  * @callback: (scope async): callback to call when the request is satisfied
659  * @user_data: (closure): the data to pass to callback function
660  *
661  * Request an asynchronous write of @count bytes from @buffer into 
662  * the stream. When the operation is finished @callback will be called.
663  * You can then call g_output_stream_write_finish() to get the result of the 
664  * operation.
665  *
666  * During an async request no other sync and async calls are allowed, 
667  * and will result in %G_IO_ERROR_PENDING errors. 
668  *
669  * A value of @count larger than %G_MAXSSIZE will cause a 
670  * %G_IO_ERROR_INVALID_ARGUMENT error.
671  *
672  * On success, the number of bytes written will be passed to the
673  * @callback. It is not an error if this is not the same as the 
674  * requested size, as it can happen e.g. on a partial I/O error, 
675  * but generally we try to write as many bytes as requested. 
676  *
677  * You are guaranteed that this method will never fail with
678  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
679  * method will just wait until this changes.
680  *
681  * Any outstanding I/O request with higher priority (lower numerical 
682  * value) will be executed before an outstanding request with lower 
683  * priority. Default priority is %G_PRIORITY_DEFAULT.
684  *
685  * The asyncronous methods have a default fallback that uses threads 
686  * to implement asynchronicity, so they are optional for inheriting 
687  * classes. However, if you override one you must override all.
688  *
689  * For the synchronous, blocking version of this function, see 
690  * g_output_stream_write().
691  **/
692 void
693 g_output_stream_write_async (GOutputStream       *stream,
694                              const void          *buffer,
695                              gsize                count,
696                              int                  io_priority,
697                              GCancellable        *cancellable,
698                              GAsyncReadyCallback  callback,
699                              gpointer             user_data)
700 {
701   GOutputStreamClass *class;
702   GError *error = NULL;
703   GTask *task;
704
705   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
706   g_return_if_fail (buffer != NULL);
707
708   task = g_task_new (stream, cancellable, callback, user_data);
709   g_task_set_source_tag (task, g_output_stream_write_async);
710   g_task_set_priority (task, io_priority);
711
712   if (count == 0)
713     {
714       g_task_return_int (task, 0);
715       g_object_unref (task);
716       return;
717     }
718
719   if (((gssize) count) < 0)
720     {
721       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
722                                _("Too large count value passed to %s"),
723                                G_STRFUNC);
724       g_object_unref (task);
725       return;
726     }
727
728   if (!g_output_stream_set_pending (stream, &error))
729     {
730       g_task_return_error (task, error);
731       g_object_unref (task);
732       return;
733     }
734   
735   class = G_OUTPUT_STREAM_GET_CLASS (stream);
736
737   class->write_async (stream, buffer, count, io_priority, cancellable,
738                       async_ready_write_callback_wrapper, task);
739 }
740
741 /**
742  * g_output_stream_write_finish:
743  * @stream: a #GOutputStream.
744  * @result: a #GAsyncResult.
745  * @error: a #GError location to store the error occurring, or %NULL to 
746  * ignore.
747  * 
748  * Finishes a stream write operation.
749  * 
750  * Returns: a #gssize containing the number of bytes written to the stream.
751  **/
752 gssize
753 g_output_stream_write_finish (GOutputStream  *stream,
754                               GAsyncResult   *result,
755                               GError        **error)
756 {
757   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
758   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
759   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
760
761   /* @result is always the GTask created by g_output_stream_write_async();
762    * we called class->write_finish() from async_ready_write_callback_wrapper.
763    */
764   return g_task_propagate_int (G_TASK (result), error);
765 }
766
767 static void
768 write_bytes_callback (GObject      *stream,
769                       GAsyncResult *result,
770                       gpointer      user_data)
771 {
772   GTask *task = user_data;
773   GError *error = NULL;
774   gssize nwrote;
775
776   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
777                                          result, &error);
778   if (nwrote == -1)
779     g_task_return_error (task, error);
780   else
781     g_task_return_int (task, nwrote);
782   g_object_unref (task);
783 }
784
785 /**
786  * g_output_stream_write_bytes_async:
787  * @stream: A #GOutputStream.
788  * @bytes: The bytes to write
789  * @io_priority: the io priority of the request.
790  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
791  * @callback: (scope async): callback to call when the request is satisfied
792  * @user_data: (closure): the data to pass to callback function
793  *
794  * This function is similar to g_output_stream_write_async(), but
795  * takes a #GBytes as input.  Due to the refcounted nature of #GBytes,
796  * this allows the stream to avoid taking a copy of the data.
797  *
798  * However, note that this function <emphasis>may</emphasis> still
799  * perform partial writes, just like g_output_stream_write_async().
800  * If that occurs, to continue writing, you will need to create a new
801  * #GBytes containing just the remaining bytes, using
802  * g_bytes_new_from_bytes().  Passing the same #GBytes instance
803  * multiple times potentially can result in duplicated data in the
804  * output stream.
805  *
806  * For the synchronous, blocking version of this function, see
807  * g_output_stream_write_bytes().
808  **/
809 void
810 g_output_stream_write_bytes_async (GOutputStream       *stream,
811                                    GBytes              *bytes,
812                                    int                  io_priority,
813                                    GCancellable        *cancellable,
814                                    GAsyncReadyCallback  callback,
815                                    gpointer             user_data)
816 {
817   GTask *task;
818   gsize size;
819   gconstpointer data;
820
821   data = g_bytes_get_data (bytes, &size);
822
823   task = g_task_new (stream, cancellable, callback, user_data);
824   g_task_set_task_data (task, g_bytes_ref (bytes),
825                         (GDestroyNotify) g_bytes_unref);
826
827   g_output_stream_write_async (stream,
828                                data, size,
829                                io_priority,
830                                cancellable,
831                                write_bytes_callback,
832                                task);
833 }
834
835 /**
836  * g_output_stream_write_bytes_finish:
837  * @stream: a #GOutputStream.
838  * @result: a #GAsyncResult.
839  * @error: a #GError location to store the error occurring, or %NULL to
840  * ignore.
841  *
842  * Finishes a stream write-from-#GBytes operation.
843  *
844  * Returns: a #gssize containing the number of bytes written to the stream.
845  **/
846 gssize
847 g_output_stream_write_bytes_finish (GOutputStream  *stream,
848                                     GAsyncResult   *result,
849                                     GError        **error)
850 {
851   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
852   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
853
854   return g_task_propagate_int (G_TASK (result), error);
855 }
856
857 static void
858 async_ready_splice_callback_wrapper (GObject      *source_object,
859                                      GAsyncResult *res,
860                                      gpointer     _data)
861 {
862   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
863   GOutputStreamClass *class;
864   GTask *task = _data;
865   gssize nspliced;
866   GError *error = NULL;
867
868   g_output_stream_clear_pending (stream);
869   
870   if (g_async_result_legacy_propagate_error (res, &error))
871     nspliced = -1;
872   else
873     {
874       class = G_OUTPUT_STREAM_GET_CLASS (stream);
875       nspliced = class->splice_finish (stream, res, &error);
876     }
877
878   if (nspliced >= 0)
879     g_task_return_int (task, nspliced);
880   else
881     g_task_return_error (task, error);
882   g_object_unref (task);
883 }
884
885 /**
886  * g_output_stream_splice_async:
887  * @stream: a #GOutputStream.
888  * @source: a #GInputStream. 
889  * @flags: a set of #GOutputStreamSpliceFlags.
890  * @io_priority: the io priority of the request.
891  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
892  * @callback: (scope async): a #GAsyncReadyCallback. 
893  * @user_data: (closure): user data passed to @callback.
894  * 
895  * Splices a stream asynchronously.
896  * When the operation is finished @callback will be called.
897  * You can then call g_output_stream_splice_finish() to get the 
898  * result of the operation.
899  *
900  * For the synchronous, blocking version of this function, see 
901  * g_output_stream_splice().
902  **/
903 void
904 g_output_stream_splice_async (GOutputStream            *stream,
905                               GInputStream             *source,
906                               GOutputStreamSpliceFlags  flags,
907                               int                       io_priority,
908                               GCancellable             *cancellable,
909                               GAsyncReadyCallback       callback,
910                               gpointer                  user_data)
911 {
912   GOutputStreamClass *class;
913   GTask *task;
914   GError *error = NULL;
915
916   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
917   g_return_if_fail (G_IS_INPUT_STREAM (source));
918
919   task = g_task_new (stream, cancellable, callback, user_data);
920   g_task_set_source_tag (task, g_output_stream_splice_async);
921   g_task_set_priority (task, io_priority);
922   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
923
924   if (g_input_stream_is_closed (source))
925     {
926       g_task_return_new_error (task,
927                                G_IO_ERROR, G_IO_ERROR_CLOSED,
928                                _("Source stream is already closed"));
929       g_object_unref (task);
930       return;
931     }
932   
933   if (!g_output_stream_set_pending (stream, &error))
934     {
935       g_task_return_error (task, error);
936       g_object_unref (task);
937       return;
938     }
939
940   class = G_OUTPUT_STREAM_GET_CLASS (stream);
941
942   class->splice_async (stream, source, flags, io_priority, cancellable,
943                        async_ready_splice_callback_wrapper, task);
944 }
945
946 /**
947  * g_output_stream_splice_finish:
948  * @stream: a #GOutputStream.
949  * @result: a #GAsyncResult.
950  * @error: a #GError location to store the error occurring, or %NULL to 
951  * ignore.
952  *
953  * Finishes an asynchronous stream splice operation.
954  * 
955  * Returns: a #gssize of the number of bytes spliced. Note that if the
956  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
957  *     will be returned, and there is no way to determine the actual
958  *     number of bytes spliced.
959  **/
960 gssize
961 g_output_stream_splice_finish (GOutputStream  *stream,
962                                GAsyncResult   *result,
963                                GError        **error)
964 {
965   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
966   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
967   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
968
969   /* @result is always the GTask created by g_output_stream_splice_async();
970    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
971    */
972   return g_task_propagate_int (G_TASK (result), error);
973 }
974
975 static void
976 async_ready_flush_callback_wrapper (GObject      *source_object,
977                                     GAsyncResult *res,
978                                     gpointer      user_data)
979 {
980   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
981   GOutputStreamClass *class;
982   GTask *task = user_data;
983   gboolean flushed;
984   GError *error = NULL;
985
986   g_output_stream_clear_pending (stream);
987   
988   if (g_async_result_legacy_propagate_error (res, &error))
989     flushed = FALSE;
990   else
991     {
992       class = G_OUTPUT_STREAM_GET_CLASS (stream);
993       flushed = class->flush_finish (stream, res, &error);
994     }
995
996   if (flushed)
997     g_task_return_boolean (task, TRUE);
998   else
999     g_task_return_error (task, error);
1000   g_object_unref (task);
1001 }
1002
1003 /**
1004  * g_output_stream_flush_async:
1005  * @stream: a #GOutputStream.
1006  * @io_priority: the io priority of the request.
1007  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1008  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1009  * @user_data: (closure): the data to pass to callback function
1010  * 
1011  * Forces an asynchronous write of all user-space buffered data for
1012  * the given @stream.
1013  * For behaviour details see g_output_stream_flush().
1014  *
1015  * When the operation is finished @callback will be 
1016  * called. You can then call g_output_stream_flush_finish() to get the 
1017  * result of the operation.
1018  **/
1019 void
1020 g_output_stream_flush_async (GOutputStream       *stream,
1021                              int                  io_priority,
1022                              GCancellable        *cancellable,
1023                              GAsyncReadyCallback  callback,
1024                              gpointer             user_data)
1025 {
1026   GOutputStreamClass *class;
1027   GTask *task;
1028   GError *error = NULL;
1029
1030   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1031
1032   task = g_task_new (stream, cancellable, callback, user_data);
1033   g_task_set_source_tag (task, g_output_stream_flush_async);
1034   g_task_set_priority (task, io_priority);
1035
1036   if (!g_output_stream_set_pending (stream, &error))
1037     {
1038       g_task_return_error (task, error);
1039       g_object_unref (task);
1040       return;
1041     }
1042
1043   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1044   
1045   if (class->flush_async == NULL)
1046     {
1047       g_task_return_boolean (task, TRUE);
1048       g_object_unref (task);
1049       return;
1050     }
1051       
1052   class->flush_async (stream, io_priority, cancellable,
1053                       async_ready_flush_callback_wrapper, task);
1054 }
1055
1056 /**
1057  * g_output_stream_flush_finish:
1058  * @stream: a #GOutputStream.
1059  * @result: a GAsyncResult.
1060  * @error: a #GError location to store the error occurring, or %NULL to 
1061  * ignore.
1062  * 
1063  * Finishes flushing an output stream.
1064  * 
1065  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1066  **/
1067 gboolean
1068 g_output_stream_flush_finish (GOutputStream  *stream,
1069                               GAsyncResult   *result,
1070                               GError        **error)
1071 {
1072   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1073   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1074   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1075
1076   /* @result is always the GTask created by g_output_stream_flush_async();
1077    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1078    */
1079   return g_task_propagate_boolean (G_TASK (result), error);
1080 }
1081
1082
1083 static void
1084 async_ready_close_callback_wrapper (GObject      *source_object,
1085                                     GAsyncResult *res,
1086                                     gpointer      user_data)
1087 {
1088   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1089   GOutputStreamClass *class;
1090   GTask *task = user_data;
1091   GError *error = g_task_get_task_data (task);
1092
1093   stream->priv->closing = FALSE;
1094   stream->priv->closed = TRUE;
1095
1096   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1097     {
1098       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1099
1100       class->close_finish (stream, res,
1101                            error ? NULL : &error);
1102     }
1103
1104   if (error != NULL)
1105     g_task_return_error (task, error);
1106   else
1107     g_task_return_boolean (task, TRUE);
1108   g_object_unref (task);
1109 }
1110
1111 static void
1112 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1113                                             GAsyncResult *res,
1114                                             gpointer      user_data)
1115 {
1116   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1117   GOutputStreamClass *class;
1118   GTask *task = user_data;
1119   GError *error = NULL;
1120
1121   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1122
1123   if (!g_async_result_legacy_propagate_error (res, &error))
1124     {
1125       class->flush_finish (stream, res, &error);
1126     }
1127
1128   /* propagate the possible error */
1129   if (error)
1130     g_task_set_task_data (task, error, NULL);
1131
1132   /* we still close, even if there was a flush error */
1133   class->close_async (stream,
1134                       g_task_get_priority (task),
1135                       g_task_get_cancellable (task),
1136                       async_ready_close_callback_wrapper, task);
1137 }
1138
1139 static void
1140 real_close_async_cb (GObject      *source_object,
1141                      GAsyncResult *res,
1142                      gpointer      user_data)
1143 {
1144   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1145   GTask *task = user_data;
1146   GError *error = NULL;
1147   gboolean ret;
1148
1149   g_output_stream_clear_pending (stream);
1150
1151   ret = g_output_stream_internal_close_finish (stream, res, &error);
1152
1153   if (error != NULL)
1154     g_task_return_error (task, error);
1155   else
1156     g_task_return_boolean (task, ret);
1157
1158   g_object_unref (task);
1159 }
1160
1161 /**
1162  * g_output_stream_close_async:
1163  * @stream: A #GOutputStream.
1164  * @io_priority: the io priority of the request.
1165  * @cancellable: (allow-none): optional cancellable object
1166  * @callback: (scope async): callback to call when the request is satisfied
1167  * @user_data: (closure): the data to pass to callback function
1168  *
1169  * Requests an asynchronous close of the stream, releasing resources 
1170  * related to it. When the operation is finished @callback will be 
1171  * called. You can then call g_output_stream_close_finish() to get 
1172  * the result of the operation.
1173  *
1174  * For behaviour details see g_output_stream_close().
1175  *
1176  * The asyncronous methods have a default fallback that uses threads 
1177  * to implement asynchronicity, so they are optional for inheriting 
1178  * classes. However, if you override one you must override all.
1179  **/
1180 void
1181 g_output_stream_close_async (GOutputStream       *stream,
1182                              int                  io_priority,
1183                              GCancellable        *cancellable,
1184                              GAsyncReadyCallback  callback,
1185                              gpointer             user_data)
1186 {
1187   GTask *task;
1188   GError *error = NULL;
1189
1190   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1191   
1192   task = g_task_new (stream, cancellable, callback, user_data);
1193   g_task_set_source_tag (task, g_output_stream_close_async);
1194   g_task_set_priority (task, io_priority);
1195
1196   if (!g_output_stream_set_pending (stream, &error))
1197     {
1198       g_task_return_error (task, error);
1199       g_object_unref (task);
1200       return;
1201     }
1202
1203   g_output_stream_internal_close_async (stream, io_priority, cancellable,
1204                                         real_close_async_cb, task);
1205 }
1206
1207 /* Must always be called inside
1208  * g_output_stream_set_pending()/g_output_stream_clear_pending().
1209  */
1210 void
1211 g_output_stream_internal_close_async (GOutputStream       *stream,
1212                                       int                  io_priority,
1213                                       GCancellable        *cancellable,
1214                                       GAsyncReadyCallback  callback,
1215                                       gpointer             user_data)
1216 {
1217   GOutputStreamClass *class;
1218   GTask *task;
1219
1220   task = g_task_new (stream, cancellable, callback, user_data);
1221   g_task_set_source_tag (task, g_output_stream_internal_close_async);
1222   g_task_set_priority (task, io_priority);
1223
1224   if (stream->priv->closed)
1225     {
1226       g_task_return_boolean (task, TRUE);
1227       g_object_unref (task);
1228       return;
1229     }
1230
1231   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1232   stream->priv->closing = TRUE;
1233
1234   /* Call close_async directly if there is no need to flush, or if the flush
1235      can be done sync (in the output stream async close thread) */
1236   if (class->flush_async == NULL ||
1237       (class->flush_async == g_output_stream_real_flush_async &&
1238        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1239     {
1240       class->close_async (stream, io_priority, cancellable,
1241                           async_ready_close_callback_wrapper, task);
1242     }
1243   else
1244     {
1245       /* First do an async flush, then do the async close in the callback
1246          wrapper (see async_ready_close_flushed_callback_wrapper) */
1247       class->flush_async (stream, io_priority, cancellable,
1248                           async_ready_close_flushed_callback_wrapper, task);
1249     }
1250 }
1251
1252 static gboolean
1253 g_output_stream_internal_close_finish (GOutputStream  *stream,
1254                                        GAsyncResult   *result,
1255                                        GError        **error)
1256 {
1257   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1258   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1259   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
1260
1261   return g_task_propagate_boolean (G_TASK (result), error);
1262 }
1263
1264 /**
1265  * g_output_stream_close_finish:
1266  * @stream: a #GOutputStream.
1267  * @result: a #GAsyncResult.
1268  * @error: a #GError location to store the error occurring, or %NULL to 
1269  * ignore.
1270  * 
1271  * Closes an output stream.
1272  * 
1273  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1274  **/
1275 gboolean
1276 g_output_stream_close_finish (GOutputStream  *stream,
1277                               GAsyncResult   *result,
1278                               GError        **error)
1279 {
1280   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1281   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1282   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1283
1284   /* @result is always the GTask created by g_output_stream_close_async();
1285    * we called class->close_finish() from async_ready_close_callback_wrapper.
1286    */
1287   return g_task_propagate_boolean (G_TASK (result), error);
1288 }
1289
1290 /**
1291  * g_output_stream_is_closed:
1292  * @stream: a #GOutputStream.
1293  * 
1294  * Checks if an output stream has already been closed.
1295  * 
1296  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1297  **/
1298 gboolean
1299 g_output_stream_is_closed (GOutputStream *stream)
1300 {
1301   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1302   
1303   return stream->priv->closed;
1304 }
1305
1306 /**
1307  * g_output_stream_is_closing:
1308  * @stream: a #GOutputStream.
1309  *
1310  * Checks if an output stream is being closed. This can be
1311  * used inside e.g. a flush implementation to see if the
1312  * flush (or other i/o operation) is called from within
1313  * the closing operation.
1314  *
1315  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1316  *
1317  * Since: 2.24
1318  **/
1319 gboolean
1320 g_output_stream_is_closing (GOutputStream *stream)
1321 {
1322   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1323
1324   return stream->priv->closing;
1325 }
1326
1327 /**
1328  * g_output_stream_has_pending:
1329  * @stream: a #GOutputStream.
1330  * 
1331  * Checks if an ouput stream has pending actions.
1332  * 
1333  * Returns: %TRUE if @stream has pending actions. 
1334  **/
1335 gboolean
1336 g_output_stream_has_pending (GOutputStream *stream)
1337 {
1338   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1339   
1340   return stream->priv->pending;
1341 }
1342
1343 /**
1344  * g_output_stream_set_pending:
1345  * @stream: a #GOutputStream.
1346  * @error: a #GError location to store the error occurring, or %NULL to 
1347  * ignore.
1348  * 
1349  * Sets @stream to have actions pending. If the pending flag is
1350  * already set or @stream is closed, it will return %FALSE and set
1351  * @error.
1352  *
1353  * Return value: %TRUE if pending was previously unset and is now set.
1354  **/
1355 gboolean
1356 g_output_stream_set_pending (GOutputStream *stream,
1357                              GError **error)
1358 {
1359   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1360   
1361   if (stream->priv->closed)
1362     {
1363       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1364                            _("Stream is already closed"));
1365       return FALSE;
1366     }
1367   
1368   if (stream->priv->pending)
1369     {
1370       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1371                            /* Translators: This is an error you get if there is
1372                             * already an operation running against this stream when
1373                             * you try to start one */
1374                            _("Stream has outstanding operation"));
1375       return FALSE;
1376     }
1377   
1378   stream->priv->pending = TRUE;
1379   return TRUE;
1380 }
1381
1382 /**
1383  * g_output_stream_clear_pending:
1384  * @stream: output stream
1385  * 
1386  * Clears the pending flag on @stream.
1387  **/
1388 void
1389 g_output_stream_clear_pending (GOutputStream *stream)
1390 {
1391   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1392   
1393   stream->priv->pending = FALSE;
1394 }
1395
1396 /**
1397  * g_output_stream_async_write_is_via_threads:
1398  * @stream: a #GOutputStream.
1399  *
1400  * Checks if an ouput stream's write_async function uses threads.
1401  *
1402  * Returns: %TRUE if @stream's write_async function uses threads.
1403  **/
1404 gboolean
1405 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
1406 {
1407   GOutputStreamClass *class;
1408
1409   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1410
1411   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1412
1413   return (class->write_async == g_output_stream_real_write_async &&
1414       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1415         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
1416 }
1417
1418
1419 /********************************************
1420  *   Default implementation of async ops    *
1421  ********************************************/
1422
1423 typedef struct {
1424   const void         *buffer;
1425   gsize               count_requested;
1426   gssize              count_written;
1427 } WriteData;
1428
1429 static void
1430 free_write_data (WriteData *op)
1431 {
1432   g_slice_free (WriteData, op);
1433 }
1434
1435 static void
1436 write_async_thread (GTask        *task,
1437                     gpointer      source_object,
1438                     gpointer      task_data,
1439                     GCancellable *cancellable)
1440 {
1441   GOutputStream *stream = source_object;
1442   WriteData *op = task_data;
1443   GOutputStreamClass *class;
1444   GError *error = NULL;
1445   gssize count_written;
1446
1447   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1448   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1449                                    cancellable, &error);
1450   if (count_written == -1)
1451     g_task_return_error (task, error);
1452   else
1453     g_task_return_int (task, count_written);
1454 }
1455
1456 static void write_async_pollable (GPollableOutputStream *stream,
1457                                   GTask                 *task);
1458
1459 static gboolean
1460 write_async_pollable_ready (GPollableOutputStream *stream,
1461                             gpointer               user_data)
1462 {
1463   GTask *task = user_data;
1464
1465   write_async_pollable (stream, task);
1466   return FALSE;
1467 }
1468
1469 static void
1470 write_async_pollable (GPollableOutputStream *stream,
1471                       GTask                 *task)
1472 {
1473   GError *error = NULL;
1474   WriteData *op = g_task_get_task_data (task);
1475   gssize count_written;
1476
1477   if (g_task_return_error_if_cancelled (task))
1478     return;
1479
1480   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1481     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1482
1483   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1484     {
1485       GSource *source;
1486
1487       g_error_free (error);
1488
1489       source = g_pollable_output_stream_create_source (stream,
1490                                                        g_task_get_cancellable (task));
1491       g_task_attach_source (task, source,
1492                             (GSourceFunc) write_async_pollable_ready);
1493       g_source_unref (source);
1494       return;
1495     }
1496
1497   if (count_written == -1)
1498     g_task_return_error (task, error);
1499   else
1500     g_task_return_int (task, count_written);
1501 }
1502
1503 static void
1504 g_output_stream_real_write_async (GOutputStream       *stream,
1505                                   const void          *buffer,
1506                                   gsize                count,
1507                                   int                  io_priority,
1508                                   GCancellable        *cancellable,
1509                                   GAsyncReadyCallback  callback,
1510                                   gpointer             user_data)
1511 {
1512   GTask *task;
1513   WriteData *op;
1514
1515   op = g_slice_new0 (WriteData);
1516   task = g_task_new (stream, cancellable, callback, user_data);
1517   g_task_set_check_cancellable (task, FALSE);
1518   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1519   op->buffer = buffer;
1520   op->count_requested = count;
1521
1522   if (!g_output_stream_async_write_is_via_threads (stream))
1523     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1524   else
1525     g_task_run_in_thread (task, write_async_thread);
1526   g_object_unref (task);
1527 }
1528
1529 static gssize
1530 g_output_stream_real_write_finish (GOutputStream  *stream,
1531                                    GAsyncResult   *result,
1532                                    GError        **error)
1533 {
1534   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1535
1536   return g_task_propagate_int (G_TASK (result), error);
1537 }
1538
1539 typedef struct {
1540   GInputStream *source;
1541   GOutputStreamSpliceFlags flags;
1542   gssize n_read;
1543   gssize n_written;
1544   gsize bytes_copied;
1545   GError *error;
1546   guint8 *buffer;
1547 } SpliceData;
1548
1549 static void
1550 free_splice_data (SpliceData *op)
1551 {
1552   g_clear_pointer (&op->buffer, g_free);
1553   g_object_unref (op->source);
1554   g_clear_error (&op->error);
1555   g_free (op);
1556 }
1557
1558 static void
1559 real_splice_async_complete_cb (GTask *task)
1560 {
1561   SpliceData *op = g_task_get_task_data (task);
1562
1563   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
1564       !g_input_stream_is_closed (op->source))
1565     return;
1566
1567   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
1568       !g_output_stream_is_closed (g_task_get_source_object (task)))
1569     return;
1570
1571   if (op->error != NULL)
1572     {
1573       g_task_return_error (task, op->error);
1574       op->error = NULL;
1575     }
1576   else
1577     {
1578       g_task_return_int (task, op->bytes_copied);
1579     }
1580
1581   g_object_unref (task);
1582 }
1583
1584 static void
1585 real_splice_async_close_input_cb (GObject      *source,
1586                                   GAsyncResult *res,
1587                                   gpointer      user_data)
1588 {
1589   GTask *task = user_data;
1590
1591   g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
1592
1593   real_splice_async_complete_cb (task);
1594 }
1595
1596 static void
1597 real_splice_async_close_output_cb (GObject      *source,
1598                                    GAsyncResult *res,
1599                                    gpointer      user_data)
1600 {
1601   GTask *task = G_TASK (user_data);
1602   SpliceData *op = g_task_get_task_data (task);
1603   GError **error = (op->error == NULL) ? &op->error : NULL;
1604
1605   g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
1606
1607   real_splice_async_complete_cb (task);
1608 }
1609
1610 static void
1611 real_splice_async_complete (GTask *task)
1612 {
1613   SpliceData *op = g_task_get_task_data (task);
1614   gboolean done = TRUE;
1615
1616   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
1617     {
1618       done = FALSE;
1619       g_input_stream_close_async (op->source, g_task_get_priority (task),
1620                                   g_task_get_cancellable (task),
1621                                   real_splice_async_close_input_cb, task);
1622     }
1623
1624   if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
1625     {
1626       done = FALSE;
1627       g_output_stream_internal_close_async (g_task_get_source_object (task),
1628                                             g_task_get_priority (task),
1629                                             g_task_get_cancellable (task),
1630                                             real_splice_async_close_output_cb,
1631                                             task);
1632     }
1633
1634   if (done)
1635     real_splice_async_complete_cb (task);
1636 }
1637
1638 static void real_splice_async_read_cb (GObject      *source,
1639                                        GAsyncResult *res,
1640                                        gpointer      user_data);
1641
1642 static void
1643 real_splice_async_write_cb (GObject      *source,
1644                             GAsyncResult *res,
1645                             gpointer      user_data)
1646 {
1647   GOutputStreamClass *class;
1648   GTask *task = G_TASK (user_data);
1649   SpliceData *op = g_task_get_task_data (task);
1650   gssize ret;
1651
1652   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1653
1654   ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
1655
1656   if (ret == -1)
1657     {
1658       real_splice_async_complete (task);
1659       return;
1660     }
1661
1662   op->n_written += ret;
1663   op->bytes_copied += ret;
1664   if (op->bytes_copied > G_MAXSSIZE)
1665     op->bytes_copied = G_MAXSSIZE;
1666
1667   if (op->n_written < op->n_read)
1668     {
1669       class->write_async (g_task_get_source_object (task),
1670                           op->buffer + op->n_written,
1671                           op->n_read - op->n_written,
1672                           g_task_get_priority (task),
1673                           g_task_get_cancellable (task),
1674                           real_splice_async_write_cb, task);
1675       return;
1676     }
1677
1678   g_input_stream_read_async (op->source, op->buffer, 8192,
1679                              g_task_get_priority (task),
1680                              g_task_get_cancellable (task),
1681                              real_splice_async_read_cb, task);
1682 }
1683
1684 static void
1685 real_splice_async_read_cb (GObject      *source,
1686                            GAsyncResult *res,
1687                            gpointer      user_data)
1688 {
1689   GOutputStreamClass *class;
1690   GTask *task = G_TASK (user_data);
1691   SpliceData *op = g_task_get_task_data (task);
1692   gssize ret;
1693
1694   class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
1695
1696   ret = g_input_stream_read_finish (op->source, res, &op->error);
1697   if (ret == -1 || ret == 0)
1698     {
1699       real_splice_async_complete (task);
1700       return;
1701     }
1702
1703   op->n_read = ret;
1704   op->n_written = 0;
1705
1706   class->write_async (g_task_get_source_object (task), op->buffer,
1707                       op->n_read, g_task_get_priority (task),
1708                       g_task_get_cancellable (task),
1709                       real_splice_async_write_cb, task);
1710 }
1711
1712 static void
1713 splice_async_thread (GTask        *task,
1714                      gpointer      source_object,
1715                      gpointer      task_data,
1716                      GCancellable *cancellable)
1717 {
1718   GOutputStream *stream = source_object;
1719   SpliceData *op = task_data;
1720   GOutputStreamClass *class;
1721   GError *error = NULL;
1722   gssize bytes_copied;
1723
1724   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1725   
1726   bytes_copied = class->splice (stream,
1727                                 op->source,
1728                                 op->flags,
1729                                 cancellable,
1730                                 &error);
1731   if (bytes_copied == -1)
1732     g_task_return_error (task, error);
1733   else
1734     g_task_return_int (task, bytes_copied);
1735 }
1736
1737 static void
1738 g_output_stream_real_splice_async (GOutputStream             *stream,
1739                                    GInputStream              *source,
1740                                    GOutputStreamSpliceFlags   flags,
1741                                    int                        io_priority,
1742                                    GCancellable              *cancellable,
1743                                    GAsyncReadyCallback        callback,
1744                                    gpointer                   user_data)
1745 {
1746   GTask *task;
1747   SpliceData *op;
1748
1749   op = g_new0 (SpliceData, 1);
1750   task = g_task_new (stream, cancellable, callback, user_data);
1751   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1752   op->flags = flags;
1753   op->source = g_object_ref (source);
1754
1755   if (g_input_stream_async_read_is_via_threads (source) &&
1756       g_output_stream_async_write_is_via_threads (stream))
1757     {
1758       g_task_run_in_thread (task, splice_async_thread);
1759       g_object_unref (task);
1760     }
1761   else
1762     {
1763       op->buffer = g_malloc (8192);
1764       g_input_stream_read_async (op->source, op->buffer, 8192,
1765                                  g_task_get_priority (task),
1766                                  g_task_get_cancellable (task),
1767                                  real_splice_async_read_cb, task);
1768     }
1769 }
1770
1771 static gssize
1772 g_output_stream_real_splice_finish (GOutputStream  *stream,
1773                                     GAsyncResult   *result,
1774                                     GError        **error)
1775 {
1776   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1777
1778   return g_task_propagate_int (G_TASK (result), error);
1779 }
1780
1781
1782 static void
1783 flush_async_thread (GTask        *task,
1784                     gpointer      source_object,
1785                     gpointer      task_data,
1786                     GCancellable *cancellable)
1787 {
1788   GOutputStream *stream = source_object;
1789   GOutputStreamClass *class;
1790   gboolean result;
1791   GError *error = NULL;
1792
1793   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1794   result = TRUE;
1795   if (class->flush)
1796     result = class->flush (stream, cancellable, &error);
1797
1798   if (result)
1799     g_task_return_boolean (task, TRUE);
1800   else
1801     g_task_return_error (task, error);
1802 }
1803
1804 static void
1805 g_output_stream_real_flush_async (GOutputStream       *stream,
1806                                   int                  io_priority,
1807                                   GCancellable        *cancellable,
1808                                   GAsyncReadyCallback  callback,
1809                                   gpointer             user_data)
1810 {
1811   GTask *task;
1812
1813   task = g_task_new (stream, cancellable, callback, user_data);
1814   g_task_set_priority (task, io_priority);
1815   g_task_run_in_thread (task, flush_async_thread);
1816   g_object_unref (task);
1817 }
1818
1819 static gboolean
1820 g_output_stream_real_flush_finish (GOutputStream  *stream,
1821                                    GAsyncResult   *result,
1822                                    GError        **error)
1823 {
1824   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1825
1826   return g_task_propagate_boolean (G_TASK (result), error);
1827 }
1828
1829 static void
1830 close_async_thread (GTask        *task,
1831                     gpointer      source_object,
1832                     gpointer      task_data,
1833                     GCancellable *cancellable)
1834 {
1835   GOutputStream *stream = source_object;
1836   GOutputStreamClass *class;
1837   GError *error = NULL;
1838   gboolean result = TRUE;
1839
1840   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1841
1842   /* Do a flush here if there is a flush function, and we did not have to do
1843    * an async flush before (see g_output_stream_close_async)
1844    */
1845   if (class->flush != NULL &&
1846       (class->flush_async == NULL ||
1847        class->flush_async == g_output_stream_real_flush_async))
1848     {
1849       result = class->flush (stream, cancellable, &error);
1850     }
1851
1852   /* Auto handling of cancelation disabled, and ignore
1853      cancellation, since we want to close things anyway, although
1854      possibly in a quick-n-dirty way. At least we never want to leak
1855      open handles */
1856
1857   if (class->close_fn)
1858     {
1859       /* Make sure to close, even if the flush failed (see sync close) */
1860       if (!result)
1861         class->close_fn (stream, cancellable, NULL);
1862       else
1863         result = class->close_fn (stream, cancellable, &error);
1864     }
1865
1866   if (result)
1867     g_task_return_boolean (task, TRUE);
1868   else
1869     g_task_return_error (task, error);
1870 }
1871
1872 static void
1873 g_output_stream_real_close_async (GOutputStream       *stream,
1874                                   int                  io_priority,
1875                                   GCancellable        *cancellable,
1876                                   GAsyncReadyCallback  callback,
1877                                   gpointer             user_data)
1878 {
1879   GTask *task;
1880
1881   task = g_task_new (stream, cancellable, callback, user_data);
1882   g_task_set_priority (task, io_priority);
1883   g_task_run_in_thread (task, close_async_thread);
1884   g_object_unref (task);
1885 }
1886
1887 static gboolean
1888 g_output_stream_real_close_finish (GOutputStream  *stream,
1889                                    GAsyncResult   *result,
1890                                    GError        **error)
1891 {
1892   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1893
1894   return g_task_propagate_boolean (G_TASK (result), error);
1895 }