GOutputStream: Split _close_async for internal use
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include "config.h"
24 #include "goutputstream.h"
25 #include "gcancellable.h"
26 #include "gasyncresult.h"
27 #include "gtask.h"
28 #include "ginputstream.h"
29 #include "gioerror.h"
30 #include "gioprivate.h"
31 #include "glibintl.h"
32 #include "gpollableoutputstream.h"
33
34 /**
35  * SECTION:goutputstream
36  * @short_description: Base class for implementing streaming output
37  * @include: gio/gio.h
38  *
39  * #GOutputStream has functions to write to a stream (g_output_stream_write()),
40  * to close a stream (g_output_stream_close()) and to flush pending writes
41  * (g_output_stream_flush()). 
42  *
43  * To copy the content of an input stream to an output stream without 
44  * manually handling the reads and writes, use g_output_stream_splice(). 
45  *
46  * All of these functions have async variants too.
47  **/
48
49 struct _GOutputStreamPrivate {
50   guint closed : 1;
51   guint pending : 1;
52   guint closing : 1;
53   GAsyncReadyCallback outstanding_callback;
54 };
55
56 G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE (GOutputStream, g_output_stream, G_TYPE_OBJECT)
57
58 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
59                                                     GInputStream              *source,
60                                                     GOutputStreamSpliceFlags   flags,
61                                                     GCancellable              *cancellable,
62                                                     GError                   **error);
63 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
64                                                     const void                *buffer,
65                                                     gsize                      count,
66                                                     int                        io_priority,
67                                                     GCancellable              *cancellable,
68                                                     GAsyncReadyCallback        callback,
69                                                     gpointer                   data);
70 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
71                                                     GAsyncResult              *result,
72                                                     GError                   **error);
73 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
74                                                     GInputStream              *source,
75                                                     GOutputStreamSpliceFlags   flags,
76                                                     int                        io_priority,
77                                                     GCancellable              *cancellable,
78                                                     GAsyncReadyCallback        callback,
79                                                     gpointer                   data);
80 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
81                                                     GAsyncResult              *result,
82                                                     GError                   **error);
83 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
84                                                     int                        io_priority,
85                                                     GCancellable              *cancellable,
86                                                     GAsyncReadyCallback        callback,
87                                                     gpointer                   data);
88 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
89                                                     GAsyncResult              *result,
90                                                     GError                   **error);
91 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
92                                                     int                        io_priority,
93                                                     GCancellable              *cancellable,
94                                                     GAsyncReadyCallback        callback,
95                                                     gpointer                   data);
96 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
97                                                     GAsyncResult              *result,
98                                                     GError                   **error);
99 static gboolean g_output_stream_internal_close     (GOutputStream             *stream,
100                                                     GCancellable              *cancellable,
101                                                     GError                   **error);
102 static void     g_output_stream_internal_close_async (GOutputStream           *stream,
103                                                       int                      io_priority,
104                                                       GCancellable            *cancellable,
105                                                       GAsyncReadyCallback      callback,
106                                                       gpointer                 data);
107 static gboolean g_output_stream_internal_close_finish (GOutputStream          *stream,
108                                                        GAsyncResult           *result,
109                                                        GError                **error);
110
111 static void
112 g_output_stream_dispose (GObject *object)
113 {
114   GOutputStream *stream;
115
116   stream = G_OUTPUT_STREAM (object);
117   
118   if (!stream->priv->closed)
119     g_output_stream_close (stream, NULL, NULL);
120
121   G_OBJECT_CLASS (g_output_stream_parent_class)->dispose (object);
122 }
123
124 static void
125 g_output_stream_class_init (GOutputStreamClass *klass)
126 {
127   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
128
129   gobject_class->dispose = g_output_stream_dispose;
130
131   klass->splice = g_output_stream_real_splice;
132   
133   klass->write_async = g_output_stream_real_write_async;
134   klass->write_finish = g_output_stream_real_write_finish;
135   klass->splice_async = g_output_stream_real_splice_async;
136   klass->splice_finish = g_output_stream_real_splice_finish;
137   klass->flush_async = g_output_stream_real_flush_async;
138   klass->flush_finish = g_output_stream_real_flush_finish;
139   klass->close_async = g_output_stream_real_close_async;
140   klass->close_finish = g_output_stream_real_close_finish;
141 }
142
143 static void
144 g_output_stream_init (GOutputStream *stream)
145 {
146   stream->priv = g_output_stream_get_instance_private (stream);
147 }
148
149 /**
150  * g_output_stream_write:
151  * @stream: a #GOutputStream.
152  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
153  * @count: the number of bytes to write
154  * @cancellable: (allow-none): optional cancellable object
155  * @error: location to store the error occurring, or %NULL to ignore
156  *
157  * Tries to write @count bytes from @buffer into the stream. Will block
158  * during the operation.
159  * 
160  * If count is 0, returns 0 and does nothing. A value of @count
161  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
162  *
163  * On success, the number of bytes written to the stream is returned.
164  * It is not an error if this is not the same as the requested size, as it
165  * can happen e.g. on a partial I/O error, or if there is not enough
166  * storage in the stream. All writes block until at least one byte
167  * is written or an error occurs; 0 is never returned (unless
168  * @count is 0).
169  * 
170  * If @cancellable is not %NULL, then the operation can be cancelled by
171  * triggering the cancellable object from another thread. If the operation
172  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
173  * operation was partially finished when the operation was cancelled the
174  * partial result will be returned, without an error.
175  *
176  * On error -1 is returned and @error is set accordingly.
177  * 
178  * Virtual: write_fn
179  *
180  * Return value: Number of bytes written, or -1 on error
181  **/
182 gssize
183 g_output_stream_write (GOutputStream  *stream,
184                        const void     *buffer,
185                        gsize           count,
186                        GCancellable   *cancellable,
187                        GError        **error)
188 {
189   GOutputStreamClass *class;
190   gssize res;
191
192   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
193   g_return_val_if_fail (buffer != NULL, 0);
194
195   if (count == 0)
196     return 0;
197   
198   if (((gssize) count) < 0)
199     {
200       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
201                    _("Too large count value passed to %s"), G_STRFUNC);
202       return -1;
203     }
204
205   class = G_OUTPUT_STREAM_GET_CLASS (stream);
206
207   if (class->write_fn == NULL) 
208     {
209       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
210                            _("Output stream doesn't implement write"));
211       return -1;
212     }
213   
214   if (!g_output_stream_set_pending (stream, error))
215     return -1;
216   
217   if (cancellable)
218     g_cancellable_push_current (cancellable);
219   
220   res = class->write_fn (stream, buffer, count, cancellable, error);
221   
222   if (cancellable)
223     g_cancellable_pop_current (cancellable);
224   
225   g_output_stream_clear_pending (stream);
226
227   return res; 
228 }
229
230 /**
231  * g_output_stream_write_all:
232  * @stream: a #GOutputStream.
233  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
234  * @count: the number of bytes to write
235  * @bytes_written: (out): location to store the number of bytes that was 
236  *     written to the stream
237  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
238  * @error: location to store the error occurring, or %NULL to ignore
239  *
240  * Tries to write @count bytes from @buffer into the stream. Will block
241  * during the operation.
242  * 
243  * This function is similar to g_output_stream_write(), except it tries to
244  * write as many bytes as requested, only stopping on an error.
245  *
246  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
247  * is set to @count.
248  * 
249  * If there is an error during the operation %FALSE is returned and @error
250  * is set to indicate the error status, @bytes_written is updated to contain
251  * the number of bytes written into the stream before the error occurred.
252  *
253  * Return value: %TRUE on success, %FALSE if there was an error
254  **/
255 gboolean
256 g_output_stream_write_all (GOutputStream  *stream,
257                            const void     *buffer,
258                            gsize           count,
259                            gsize          *bytes_written,
260                            GCancellable   *cancellable,
261                            GError        **error)
262 {
263   gsize _bytes_written;
264   gssize res;
265
266   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
267   g_return_val_if_fail (buffer != NULL, FALSE);
268
269   _bytes_written = 0;
270   while (_bytes_written < count)
271     {
272       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
273                                    cancellable, error);
274       if (res == -1)
275         {
276           if (bytes_written)
277             *bytes_written = _bytes_written;
278           return FALSE;
279         }
280       
281       if (res == 0)
282         g_warning ("Write returned zero without error");
283
284       _bytes_written += res;
285     }
286   
287   if (bytes_written)
288     *bytes_written = _bytes_written;
289
290   return TRUE;
291 }
292
293 /**
294  * g_output_stream_write_bytes:
295  * @stream: a #GOutputStream.
296  * @bytes: the #GBytes to write
297  * @cancellable: (allow-none): optional cancellable object
298  * @error: location to store the error occurring, or %NULL to ignore
299  *
300  * Tries to write the data from @bytes into the stream. Will block
301  * during the operation.
302  *
303  * If @bytes is 0-length, returns 0 and does nothing. A #GBytes larger
304  * than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
305  *
306  * On success, the number of bytes written to the stream is returned.
307  * It is not an error if this is not the same as the requested size, as it
308  * can happen e.g. on a partial I/O error, or if there is not enough
309  * storage in the stream. All writes block until at least one byte
310  * is written or an error occurs; 0 is never returned (unless
311  * the size of @bytes is 0).
312  *
313  * If @cancellable is not %NULL, then the operation can be cancelled by
314  * triggering the cancellable object from another thread. If the operation
315  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
316  * operation was partially finished when the operation was cancelled the
317  * partial result will be returned, without an error.
318  *
319  * On error -1 is returned and @error is set accordingly.
320  *
321  * Return value: Number of bytes written, or -1 on error
322  **/
323 gssize
324 g_output_stream_write_bytes (GOutputStream  *stream,
325                              GBytes         *bytes,
326                              GCancellable   *cancellable,
327                              GError        **error)
328 {
329   gsize size;
330   gconstpointer data;
331
332   data = g_bytes_get_data (bytes, &size);
333
334   return g_output_stream_write (stream,
335                                 data, size,
336                                 cancellable,
337                                 error);
338 }
339
340 /**
341  * g_output_stream_flush:
342  * @stream: a #GOutputStream.
343  * @cancellable: (allow-none): optional cancellable object
344  * @error: location to store the error occurring, or %NULL to ignore
345  *
346  * Forces a write of all user-space buffered data for the given
347  * @stream. Will block during the operation. Closing the stream will
348  * implicitly cause a flush.
349  *
350  * This function is optional for inherited classes.
351  * 
352  * If @cancellable is not %NULL, then the operation can be cancelled by
353  * triggering the cancellable object from another thread. If the operation
354  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
355  *
356  * Return value: %TRUE on success, %FALSE on error
357  **/
358 gboolean
359 g_output_stream_flush (GOutputStream  *stream,
360                        GCancellable   *cancellable,
361                        GError        **error)
362 {
363   GOutputStreamClass *class;
364   gboolean res;
365
366   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
367
368   if (!g_output_stream_set_pending (stream, error))
369     return FALSE;
370   
371   class = G_OUTPUT_STREAM_GET_CLASS (stream);
372
373   res = TRUE;
374   if (class->flush)
375     {
376       if (cancellable)
377         g_cancellable_push_current (cancellable);
378       
379       res = class->flush (stream, cancellable, error);
380       
381       if (cancellable)
382         g_cancellable_pop_current (cancellable);
383     }
384   
385   g_output_stream_clear_pending (stream);
386
387   return res;
388 }
389
390 /**
391  * g_output_stream_splice:
392  * @stream: a #GOutputStream.
393  * @source: a #GInputStream.
394  * @flags: a set of #GOutputStreamSpliceFlags.
395  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
396  * @error: a #GError location to store the error occurring, or %NULL to
397  * ignore.
398  *
399  * Splices an input stream into an output stream.
400  *
401  * Returns: a #gssize containing the size of the data spliced, or
402  *     -1 if an error occurred. Note that if the number of bytes
403  *     spliced is greater than %G_MAXSSIZE, then that will be
404  *     returned, and there is no way to determine the actual number
405  *     of bytes spliced.
406  **/
407 gssize
408 g_output_stream_splice (GOutputStream             *stream,
409                         GInputStream              *source,
410                         GOutputStreamSpliceFlags   flags,
411                         GCancellable              *cancellable,
412                         GError                   **error)
413 {
414   GOutputStreamClass *class;
415   gssize bytes_copied;
416
417   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
418   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
419
420   if (g_input_stream_is_closed (source))
421     {
422       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
423                            _("Source stream is already closed"));
424       return -1;
425     }
426
427   if (!g_output_stream_set_pending (stream, error))
428     return -1;
429
430   class = G_OUTPUT_STREAM_GET_CLASS (stream);
431
432   if (cancellable)
433     g_cancellable_push_current (cancellable);
434
435   bytes_copied = class->splice (stream, source, flags, cancellable, error);
436
437   if (cancellable)
438     g_cancellable_pop_current (cancellable);
439
440   g_output_stream_clear_pending (stream);
441
442   return bytes_copied;
443 }
444
445 static gssize
446 g_output_stream_real_splice (GOutputStream             *stream,
447                              GInputStream              *source,
448                              GOutputStreamSpliceFlags   flags,
449                              GCancellable              *cancellable,
450                              GError                   **error)
451 {
452   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
453   gssize n_read, n_written;
454   gsize bytes_copied;
455   char buffer[8192], *p;
456   gboolean res;
457
458   bytes_copied = 0;
459   if (class->write_fn == NULL)
460     {
461       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
462                            _("Output stream doesn't implement write"));
463       res = FALSE;
464       goto notsupported;
465     }
466
467   res = TRUE;
468   do
469     {
470       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
471       if (n_read == -1)
472         {
473           res = FALSE;
474           break;
475         }
476
477       if (n_read == 0)
478         break;
479
480       p = buffer;
481       while (n_read > 0)
482         {
483           n_written = class->write_fn (stream, p, n_read, cancellable, error);
484           if (n_written == -1)
485             {
486               res = FALSE;
487               break;
488             }
489
490           p += n_written;
491           n_read -= n_written;
492           bytes_copied += n_written;
493         }
494
495       if (bytes_copied > G_MAXSSIZE)
496         bytes_copied = G_MAXSSIZE;
497     }
498   while (res);
499
500  notsupported:
501   if (!res)
502     error = NULL; /* Ignore further errors */
503
504   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
505     {
506       /* Don't care about errors in source here */
507       g_input_stream_close (source, cancellable, NULL);
508     }
509
510   if (flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
511     {
512       /* But write errors on close are bad! */
513       res = g_output_stream_internal_close (stream, cancellable, error);
514     }
515
516   if (res)
517     return bytes_copied;
518
519   return -1;
520 }
521
522 /* Must always be called inside
523  * g_output_stream_set_pending()/g_output_stream_clear_pending(). */
524 static gboolean
525 g_output_stream_internal_close (GOutputStream  *stream,
526                                 GCancellable   *cancellable,
527                                 GError        **error)
528 {
529   GOutputStreamClass *class;
530   gboolean res;
531
532   if (stream->priv->closed)
533     return TRUE;
534
535   class = G_OUTPUT_STREAM_GET_CLASS (stream);
536
537   stream->priv->closing = TRUE;
538
539   if (cancellable)
540     g_cancellable_push_current (cancellable);
541
542   if (class->flush)
543     res = class->flush (stream, cancellable, error);
544   else
545     res = TRUE;
546
547   if (!res)
548     {
549       /* flushing caused the error that we want to return,
550        * but we still want to close the underlying stream if possible
551        */
552       if (class->close_fn)
553         class->close_fn (stream, cancellable, NULL);
554     }
555   else
556     {
557       res = TRUE;
558       if (class->close_fn)
559         res = class->close_fn (stream, cancellable, error);
560     }
561
562   if (cancellable)
563     g_cancellable_pop_current (cancellable);
564
565   stream->priv->closing = FALSE;
566   stream->priv->closed = TRUE;
567
568   return res;
569 }
570
571 /**
572  * g_output_stream_close:
573  * @stream: A #GOutputStream.
574  * @cancellable: (allow-none): optional cancellable object
575  * @error: location to store the error occurring, or %NULL to ignore
576  *
577  * Closes the stream, releasing resources related to it.
578  *
579  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
580  * Closing a stream multiple times will not return an error.
581  *
582  * Closing a stream will automatically flush any outstanding buffers in the
583  * stream.
584  *
585  * Streams will be automatically closed when the last reference
586  * is dropped, but you might want to call this function to make sure 
587  * resources are released as early as possible.
588  *
589  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
590  * open after the stream is closed. See the documentation for the individual
591  * stream for details.
592  *
593  * On failure the first error that happened will be reported, but the close
594  * operation will finish as much as possible. A stream that failed to
595  * close will still return %G_IO_ERROR_CLOSED for all operations. Still, it
596  * is important to check and report the error to the user, otherwise
597  * there might be a loss of data as all data might not be written.
598  * 
599  * If @cancellable is not %NULL, then the operation can be cancelled by
600  * triggering the cancellable object from another thread. If the operation
601  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
602  * Cancelling a close will still leave the stream closed, but there some streams
603  * can use a faster close that doesn't block to e.g. check errors. On
604  * cancellation (as with any error) there is no guarantee that all written
605  * data will reach the target. 
606  *
607  * Return value: %TRUE on success, %FALSE on failure
608  **/
609 gboolean
610 g_output_stream_close (GOutputStream  *stream,
611                        GCancellable   *cancellable,
612                        GError        **error)
613 {
614   gboolean res;
615
616   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
617
618   if (stream->priv->closed)
619     return TRUE;
620
621   if (!g_output_stream_set_pending (stream, error))
622     return FALSE;
623
624   res = g_output_stream_internal_close (stream, cancellable, error);
625
626   g_output_stream_clear_pending (stream);
627   
628   return res;
629 }
630
631 static void
632 async_ready_write_callback_wrapper (GObject      *source_object,
633                                     GAsyncResult *res,
634                                     gpointer      user_data)
635 {
636   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
637   GOutputStreamClass *class;
638   GTask *task = user_data;
639   gssize nwrote;
640   GError *error = NULL;
641
642   g_output_stream_clear_pending (stream);
643   
644   if (g_async_result_legacy_propagate_error (res, &error))
645     nwrote = -1;
646   else
647     {
648       class = G_OUTPUT_STREAM_GET_CLASS (stream);
649       nwrote = class->write_finish (stream, res, &error);
650     }
651
652   if (nwrote >= 0)
653     g_task_return_int (task, nwrote);
654   else
655     g_task_return_error (task, error);
656   g_object_unref (task);
657 }
658
659 /**
660  * g_output_stream_write_async:
661  * @stream: A #GOutputStream.
662  * @buffer: (array length=count) (element-type guint8): the buffer containing the data to write. 
663  * @count: the number of bytes to write
664  * @io_priority: the io priority of the request.
665  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
666  * @callback: (scope async): callback to call when the request is satisfied
667  * @user_data: (closure): the data to pass to callback function
668  *
669  * Request an asynchronous write of @count bytes from @buffer into 
670  * the stream. When the operation is finished @callback will be called.
671  * You can then call g_output_stream_write_finish() to get the result of the 
672  * operation.
673  *
674  * During an async request no other sync and async calls are allowed, 
675  * and will result in %G_IO_ERROR_PENDING errors. 
676  *
677  * A value of @count larger than %G_MAXSSIZE will cause a 
678  * %G_IO_ERROR_INVALID_ARGUMENT error.
679  *
680  * On success, the number of bytes written will be passed to the
681  * @callback. It is not an error if this is not the same as the 
682  * requested size, as it can happen e.g. on a partial I/O error, 
683  * but generally we try to write as many bytes as requested. 
684  *
685  * You are guaranteed that this method will never fail with
686  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
687  * method will just wait until this changes.
688  *
689  * Any outstanding I/O request with higher priority (lower numerical 
690  * value) will be executed before an outstanding request with lower 
691  * priority. Default priority is %G_PRIORITY_DEFAULT.
692  *
693  * The asyncronous methods have a default fallback that uses threads 
694  * to implement asynchronicity, so they are optional for inheriting 
695  * classes. However, if you override one you must override all.
696  *
697  * For the synchronous, blocking version of this function, see 
698  * g_output_stream_write().
699  **/
700 void
701 g_output_stream_write_async (GOutputStream       *stream,
702                              const void          *buffer,
703                              gsize                count,
704                              int                  io_priority,
705                              GCancellable        *cancellable,
706                              GAsyncReadyCallback  callback,
707                              gpointer             user_data)
708 {
709   GOutputStreamClass *class;
710   GError *error = NULL;
711   GTask *task;
712
713   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
714   g_return_if_fail (buffer != NULL);
715
716   task = g_task_new (stream, cancellable, callback, user_data);
717   g_task_set_source_tag (task, g_output_stream_write_async);
718   g_task_set_priority (task, io_priority);
719
720   if (count == 0)
721     {
722       g_task_return_int (task, 0);
723       g_object_unref (task);
724       return;
725     }
726
727   if (((gssize) count) < 0)
728     {
729       g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
730                                _("Too large count value passed to %s"),
731                                G_STRFUNC);
732       g_object_unref (task);
733       return;
734     }
735
736   if (!g_output_stream_set_pending (stream, &error))
737     {
738       g_task_return_error (task, error);
739       g_object_unref (task);
740       return;
741     }
742   
743   class = G_OUTPUT_STREAM_GET_CLASS (stream);
744
745   class->write_async (stream, buffer, count, io_priority, cancellable,
746                       async_ready_write_callback_wrapper, task);
747 }
748
749 /**
750  * g_output_stream_write_finish:
751  * @stream: a #GOutputStream.
752  * @result: a #GAsyncResult.
753  * @error: a #GError location to store the error occurring, or %NULL to 
754  * ignore.
755  * 
756  * Finishes a stream write operation.
757  * 
758  * Returns: a #gssize containing the number of bytes written to the stream.
759  **/
760 gssize
761 g_output_stream_write_finish (GOutputStream  *stream,
762                               GAsyncResult   *result,
763                               GError        **error)
764 {
765   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
766   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
767   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_write_async), FALSE);
768
769   /* @result is always the GTask created by g_output_stream_write_async();
770    * we called class->write_finish() from async_ready_write_callback_wrapper.
771    */
772   return g_task_propagate_int (G_TASK (result), error);
773 }
774
775 static void
776 write_bytes_callback (GObject      *stream,
777                       GAsyncResult *result,
778                       gpointer      user_data)
779 {
780   GTask *task = user_data;
781   GError *error = NULL;
782   gssize nwrote;
783
784   nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (stream),
785                                          result, &error);
786   if (nwrote == -1)
787     g_task_return_error (task, error);
788   else
789     g_task_return_int (task, nwrote);
790   g_object_unref (task);
791 }
792
793 /**
794  * g_output_stream_write_bytes_async:
795  * @stream: A #GOutputStream.
796  * @bytes: The bytes to write
797  * @io_priority: the io priority of the request.
798  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
799  * @callback: (scope async): callback to call when the request is satisfied
800  * @user_data: (closure): the data to pass to callback function
801  *
802  * Request an asynchronous write of the data in @bytes to the stream.
803  * When the operation is finished @callback will be called. You can
804  * then call g_output_stream_write_bytes_finish() to get the result of
805  * the operation.
806  *
807  * During an async request no other sync and async calls are allowed,
808  * and will result in %G_IO_ERROR_PENDING errors.
809  *
810  * A #GBytes larger than %G_MAXSSIZE will cause a
811  * %G_IO_ERROR_INVALID_ARGUMENT error.
812  *
813  * On success, the number of bytes written will be passed to the
814  * @callback. It is not an error if this is not the same as the
815  * requested size, as it can happen e.g. on a partial I/O error,
816  * but generally we try to write as many bytes as requested.
817  *
818  * You are guaranteed that this method will never fail with
819  * %G_IO_ERROR_WOULD_BLOCK - if @stream can't accept more data, the
820  * method will just wait until this changes.
821  *
822  * Any outstanding I/O request with higher priority (lower numerical
823  * value) will be executed before an outstanding request with lower
824  * priority. Default priority is %G_PRIORITY_DEFAULT.
825  *
826  * For the synchronous, blocking version of this function, see
827  * g_output_stream_write_bytes().
828  **/
829 void
830 g_output_stream_write_bytes_async (GOutputStream       *stream,
831                                    GBytes              *bytes,
832                                    int                  io_priority,
833                                    GCancellable        *cancellable,
834                                    GAsyncReadyCallback  callback,
835                                    gpointer             user_data)
836 {
837   GTask *task;
838   gsize size;
839   gconstpointer data;
840
841   data = g_bytes_get_data (bytes, &size);
842
843   task = g_task_new (stream, cancellable, callback, user_data);
844   g_task_set_task_data (task, g_bytes_ref (bytes),
845                         (GDestroyNotify) g_bytes_unref);
846
847   g_output_stream_write_async (stream,
848                                data, size,
849                                io_priority,
850                                cancellable,
851                                write_bytes_callback,
852                                task);
853 }
854
855 /**
856  * g_output_stream_write_bytes_finish:
857  * @stream: a #GOutputStream.
858  * @result: a #GAsyncResult.
859  * @error: a #GError location to store the error occurring, or %NULL to
860  * ignore.
861  *
862  * Finishes a stream write-from-#GBytes operation.
863  *
864  * Returns: a #gssize containing the number of bytes written to the stream.
865  **/
866 gssize
867 g_output_stream_write_bytes_finish (GOutputStream  *stream,
868                                     GAsyncResult   *result,
869                                     GError        **error)
870 {
871   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
872   g_return_val_if_fail (g_task_is_valid (result, stream), -1);
873
874   return g_task_propagate_int (G_TASK (result), error);
875 }
876
877 static void
878 async_ready_splice_callback_wrapper (GObject      *source_object,
879                                      GAsyncResult *res,
880                                      gpointer     _data)
881 {
882   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
883   GOutputStreamClass *class;
884   GTask *task = _data;
885   gssize nspliced;
886   GError *error = NULL;
887
888   g_output_stream_clear_pending (stream);
889   
890   if (g_async_result_legacy_propagate_error (res, &error))
891     nspliced = -1;
892   else
893     {
894       class = G_OUTPUT_STREAM_GET_CLASS (stream);
895       nspliced = class->splice_finish (stream, res, &error);
896     }
897
898   if (nspliced >= 0)
899     g_task_return_int (task, nspliced);
900   else
901     g_task_return_error (task, error);
902   g_object_unref (task);
903 }
904
905 /**
906  * g_output_stream_splice_async:
907  * @stream: a #GOutputStream.
908  * @source: a #GInputStream. 
909  * @flags: a set of #GOutputStreamSpliceFlags.
910  * @io_priority: the io priority of the request.
911  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore. 
912  * @callback: (scope async): a #GAsyncReadyCallback. 
913  * @user_data: (closure): user data passed to @callback.
914  * 
915  * Splices a stream asynchronously.
916  * When the operation is finished @callback will be called.
917  * You can then call g_output_stream_splice_finish() to get the 
918  * result of the operation.
919  *
920  * For the synchronous, blocking version of this function, see 
921  * g_output_stream_splice().
922  **/
923 void
924 g_output_stream_splice_async (GOutputStream            *stream,
925                               GInputStream             *source,
926                               GOutputStreamSpliceFlags  flags,
927                               int                       io_priority,
928                               GCancellable             *cancellable,
929                               GAsyncReadyCallback       callback,
930                               gpointer                  user_data)
931 {
932   GOutputStreamClass *class;
933   GTask *task;
934   GError *error = NULL;
935
936   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
937   g_return_if_fail (G_IS_INPUT_STREAM (source));
938
939   task = g_task_new (stream, cancellable, callback, user_data);
940   g_task_set_source_tag (task, g_output_stream_splice_async);
941   g_task_set_priority (task, io_priority);
942   g_task_set_task_data (task, g_object_ref (source), g_object_unref);
943
944   if (g_input_stream_is_closed (source))
945     {
946       g_task_return_new_error (task,
947                                G_IO_ERROR, G_IO_ERROR_CLOSED,
948                                _("Source stream is already closed"));
949       g_object_unref (task);
950       return;
951     }
952   
953   if (!g_output_stream_set_pending (stream, &error))
954     {
955       g_task_return_error (task, error);
956       g_object_unref (task);
957       return;
958     }
959
960   class = G_OUTPUT_STREAM_GET_CLASS (stream);
961
962   class->splice_async (stream, source, flags, io_priority, cancellable,
963                        async_ready_splice_callback_wrapper, task);
964 }
965
966 /**
967  * g_output_stream_splice_finish:
968  * @stream: a #GOutputStream.
969  * @result: a #GAsyncResult.
970  * @error: a #GError location to store the error occurring, or %NULL to 
971  * ignore.
972  *
973  * Finishes an asynchronous stream splice operation.
974  * 
975  * Returns: a #gssize of the number of bytes spliced. Note that if the
976  *     number of bytes spliced is greater than %G_MAXSSIZE, then that
977  *     will be returned, and there is no way to determine the actual
978  *     number of bytes spliced.
979  **/
980 gssize
981 g_output_stream_splice_finish (GOutputStream  *stream,
982                                GAsyncResult   *result,
983                                GError        **error)
984 {
985   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
986   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
987   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_splice_async), FALSE);
988
989   /* @result is always the GTask created by g_output_stream_splice_async();
990    * we called class->splice_finish() from async_ready_splice_callback_wrapper.
991    */
992   return g_task_propagate_int (G_TASK (result), error);
993 }
994
995 static void
996 async_ready_flush_callback_wrapper (GObject      *source_object,
997                                     GAsyncResult *res,
998                                     gpointer      user_data)
999 {
1000   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1001   GOutputStreamClass *class;
1002   GTask *task = user_data;
1003   gboolean flushed;
1004   GError *error = NULL;
1005
1006   g_output_stream_clear_pending (stream);
1007   
1008   if (g_async_result_legacy_propagate_error (res, &error))
1009     flushed = FALSE;
1010   else
1011     {
1012       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1013       flushed = class->flush_finish (stream, res, &error);
1014     }
1015
1016   if (flushed)
1017     g_task_return_boolean (task, TRUE);
1018   else
1019     g_task_return_error (task, error);
1020   g_object_unref (task);
1021 }
1022
1023 /**
1024  * g_output_stream_flush_async:
1025  * @stream: a #GOutputStream.
1026  * @io_priority: the io priority of the request.
1027  * @cancellable: (allow-none): optional #GCancellable object, %NULL to ignore.
1028  * @callback: (scope async): a #GAsyncReadyCallback to call when the request is satisfied
1029  * @user_data: (closure): the data to pass to callback function
1030  * 
1031  * Forces an asynchronous write of all user-space buffered data for
1032  * the given @stream.
1033  * For behaviour details see g_output_stream_flush().
1034  *
1035  * When the operation is finished @callback will be 
1036  * called. You can then call g_output_stream_flush_finish() to get the 
1037  * result of the operation.
1038  **/
1039 void
1040 g_output_stream_flush_async (GOutputStream       *stream,
1041                              int                  io_priority,
1042                              GCancellable        *cancellable,
1043                              GAsyncReadyCallback  callback,
1044                              gpointer             user_data)
1045 {
1046   GOutputStreamClass *class;
1047   GTask *task;
1048   GError *error = NULL;
1049
1050   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1051
1052   task = g_task_new (stream, cancellable, callback, user_data);
1053   g_task_set_source_tag (task, g_output_stream_flush_async);
1054   g_task_set_priority (task, io_priority);
1055
1056   if (!g_output_stream_set_pending (stream, &error))
1057     {
1058       g_task_return_error (task, error);
1059       g_object_unref (task);
1060       return;
1061     }
1062
1063   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1064   
1065   if (class->flush_async == NULL)
1066     {
1067       g_task_return_boolean (task, TRUE);
1068       g_object_unref (task);
1069       return;
1070     }
1071       
1072   class->flush_async (stream, io_priority, cancellable,
1073                       async_ready_flush_callback_wrapper, task);
1074 }
1075
1076 /**
1077  * g_output_stream_flush_finish:
1078  * @stream: a #GOutputStream.
1079  * @result: a GAsyncResult.
1080  * @error: a #GError location to store the error occurring, or %NULL to 
1081  * ignore.
1082  * 
1083  * Finishes flushing an output stream.
1084  * 
1085  * Returns: %TRUE if flush operation succeeded, %FALSE otherwise.
1086  **/
1087 gboolean
1088 g_output_stream_flush_finish (GOutputStream  *stream,
1089                               GAsyncResult   *result,
1090                               GError        **error)
1091 {
1092   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1093   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1094   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_flush_async), FALSE);
1095
1096   /* @result is always the GTask created by g_output_stream_flush_async();
1097    * we called class->flush_finish() from async_ready_flush_callback_wrapper.
1098    */
1099   return g_task_propagate_boolean (G_TASK (result), error);
1100 }
1101
1102
1103 static void
1104 async_ready_close_callback_wrapper (GObject      *source_object,
1105                                     GAsyncResult *res,
1106                                     gpointer      user_data)
1107 {
1108   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1109   GOutputStreamClass *class;
1110   GTask *task = user_data;
1111   GError *error = g_task_get_task_data (task);
1112
1113   stream->priv->closing = FALSE;
1114   stream->priv->closed = TRUE;
1115
1116   if (!error && !g_async_result_legacy_propagate_error (res, &error))
1117     {
1118       class = G_OUTPUT_STREAM_GET_CLASS (stream);
1119
1120       class->close_finish (stream, res,
1121                            error ? NULL : &error);
1122     }
1123
1124   if (error != NULL)
1125     g_task_return_error (task, error);
1126   else
1127     g_task_return_boolean (task, TRUE);
1128   g_object_unref (task);
1129 }
1130
1131 static void
1132 async_ready_close_flushed_callback_wrapper (GObject      *source_object,
1133                                             GAsyncResult *res,
1134                                             gpointer      user_data)
1135 {
1136   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1137   GOutputStreamClass *class;
1138   GTask *task = user_data;
1139   GError *error = NULL;
1140
1141   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1142
1143   if (!g_async_result_legacy_propagate_error (res, &error))
1144     {
1145       class->flush_finish (stream, res, &error);
1146     }
1147
1148   /* propagate the possible error */
1149   if (error)
1150     g_task_set_task_data (task, error, NULL);
1151
1152   /* we still close, even if there was a flush error */
1153   class->close_async (stream,
1154                       g_task_get_priority (task),
1155                       g_task_get_cancellable (task),
1156                       async_ready_close_callback_wrapper, task);
1157 }
1158
1159 static void
1160 real_close_async_cb (GObject      *source_object,
1161                      GAsyncResult *res,
1162                      gpointer      user_data)
1163 {
1164   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
1165   GTask *task = user_data;
1166   GError *error = NULL;
1167   gboolean ret;
1168
1169   g_output_stream_clear_pending (stream);
1170
1171   ret = g_output_stream_internal_close_finish (stream, res, &error);
1172
1173   if (error != NULL)
1174     g_task_return_error (task, error);
1175   else
1176     g_task_return_boolean (task, ret);
1177
1178   g_object_unref (task);
1179 }
1180
1181 /**
1182  * g_output_stream_close_async:
1183  * @stream: A #GOutputStream.
1184  * @io_priority: the io priority of the request.
1185  * @cancellable: (allow-none): optional cancellable object
1186  * @callback: (scope async): callback to call when the request is satisfied
1187  * @user_data: (closure): the data to pass to callback function
1188  *
1189  * Requests an asynchronous close of the stream, releasing resources 
1190  * related to it. When the operation is finished @callback will be 
1191  * called. You can then call g_output_stream_close_finish() to get 
1192  * the result of the operation.
1193  *
1194  * For behaviour details see g_output_stream_close().
1195  *
1196  * The asyncronous methods have a default fallback that uses threads 
1197  * to implement asynchronicity, so they are optional for inheriting 
1198  * classes. However, if you override one you must override all.
1199  **/
1200 void
1201 g_output_stream_close_async (GOutputStream       *stream,
1202                              int                  io_priority,
1203                              GCancellable        *cancellable,
1204                              GAsyncReadyCallback  callback,
1205                              gpointer             user_data)
1206 {
1207   GTask *task;
1208   GError *error = NULL;
1209
1210   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1211   
1212   task = g_task_new (stream, cancellable, callback, user_data);
1213   g_task_set_source_tag (task, g_output_stream_close_async);
1214   g_task_set_priority (task, io_priority);
1215
1216   if (!g_output_stream_set_pending (stream, &error))
1217     {
1218       g_task_return_error (task, error);
1219       g_object_unref (task);
1220       return;
1221     }
1222
1223   g_output_stream_internal_close_async (stream, io_priority, cancellable,
1224                                         real_close_async_cb, task);
1225 }
1226
1227 /* Must always be called inside
1228  * g_output_stream_set_pending()/g_output_stream_clear_pending().
1229  */
1230 void
1231 g_output_stream_internal_close_async (GOutputStream       *stream,
1232                                       int                  io_priority,
1233                                       GCancellable        *cancellable,
1234                                       GAsyncReadyCallback  callback,
1235                                       gpointer             user_data)
1236 {
1237   GOutputStreamClass *class;
1238   GTask *task;
1239
1240   task = g_task_new (stream, cancellable, callback, user_data);
1241   g_task_set_source_tag (task, g_output_stream_internal_close_async);
1242   g_task_set_priority (task, io_priority);
1243
1244   if (stream->priv->closed)
1245     {
1246       g_task_return_boolean (task, TRUE);
1247       g_object_unref (task);
1248       return;
1249     }
1250
1251   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1252   stream->priv->closing = TRUE;
1253
1254   /* Call close_async directly if there is no need to flush, or if the flush
1255      can be done sync (in the output stream async close thread) */
1256   if (class->flush_async == NULL ||
1257       (class->flush_async == g_output_stream_real_flush_async &&
1258        (class->flush == NULL || class->close_async == g_output_stream_real_close_async)))
1259     {
1260       class->close_async (stream, io_priority, cancellable,
1261                           async_ready_close_callback_wrapper, task);
1262     }
1263   else
1264     {
1265       /* First do an async flush, then do the async close in the callback
1266          wrapper (see async_ready_close_flushed_callback_wrapper) */
1267       class->flush_async (stream, io_priority, cancellable,
1268                           async_ready_close_flushed_callback_wrapper, task);
1269     }
1270 }
1271
1272 static gboolean
1273 g_output_stream_internal_close_finish (GOutputStream  *stream,
1274                                        GAsyncResult   *result,
1275                                        GError        **error)
1276 {
1277   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1278   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1279   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_internal_close_async), FALSE);
1280
1281   return g_task_propagate_boolean (G_TASK (result), error);
1282 }
1283
1284 /**
1285  * g_output_stream_close_finish:
1286  * @stream: a #GOutputStream.
1287  * @result: a #GAsyncResult.
1288  * @error: a #GError location to store the error occurring, or %NULL to 
1289  * ignore.
1290  * 
1291  * Closes an output stream.
1292  * 
1293  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1294  **/
1295 gboolean
1296 g_output_stream_close_finish (GOutputStream  *stream,
1297                               GAsyncResult   *result,
1298                               GError        **error)
1299 {
1300   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1301   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1302   g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_close_async), FALSE);
1303
1304   /* @result is always the GTask created by g_output_stream_close_async();
1305    * we called class->close_finish() from async_ready_close_callback_wrapper.
1306    */
1307   return g_task_propagate_boolean (G_TASK (result), error);
1308 }
1309
1310 /**
1311  * g_output_stream_is_closed:
1312  * @stream: a #GOutputStream.
1313  * 
1314  * Checks if an output stream has already been closed.
1315  * 
1316  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1317  **/
1318 gboolean
1319 g_output_stream_is_closed (GOutputStream *stream)
1320 {
1321   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1322   
1323   return stream->priv->closed;
1324 }
1325
1326 /**
1327  * g_output_stream_is_closing:
1328  * @stream: a #GOutputStream.
1329  *
1330  * Checks if an output stream is being closed. This can be
1331  * used inside e.g. a flush implementation to see if the
1332  * flush (or other i/o operation) is called from within
1333  * the closing operation.
1334  *
1335  * Returns: %TRUE if @stream is being closed. %FALSE otherwise.
1336  *
1337  * Since: 2.24
1338  **/
1339 gboolean
1340 g_output_stream_is_closing (GOutputStream *stream)
1341 {
1342   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1343
1344   return stream->priv->closing;
1345 }
1346
1347 /**
1348  * g_output_stream_has_pending:
1349  * @stream: a #GOutputStream.
1350  * 
1351  * Checks if an ouput stream has pending actions.
1352  * 
1353  * Returns: %TRUE if @stream has pending actions. 
1354  **/
1355 gboolean
1356 g_output_stream_has_pending (GOutputStream *stream)
1357 {
1358   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1359   
1360   return stream->priv->pending;
1361 }
1362
1363 /**
1364  * g_output_stream_set_pending:
1365  * @stream: a #GOutputStream.
1366  * @error: a #GError location to store the error occurring, or %NULL to 
1367  * ignore.
1368  * 
1369  * Sets @stream to have actions pending. If the pending flag is
1370  * already set or @stream is closed, it will return %FALSE and set
1371  * @error.
1372  *
1373  * Return value: %TRUE if pending was previously unset and is now set.
1374  **/
1375 gboolean
1376 g_output_stream_set_pending (GOutputStream *stream,
1377                              GError **error)
1378 {
1379   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1380   
1381   if (stream->priv->closed)
1382     {
1383       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
1384                            _("Stream is already closed"));
1385       return FALSE;
1386     }
1387   
1388   if (stream->priv->pending)
1389     {
1390       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_PENDING,
1391                            /* Translators: This is an error you get if there is
1392                             * already an operation running against this stream when
1393                             * you try to start one */
1394                            _("Stream has outstanding operation"));
1395       return FALSE;
1396     }
1397   
1398   stream->priv->pending = TRUE;
1399   return TRUE;
1400 }
1401
1402 /**
1403  * g_output_stream_clear_pending:
1404  * @stream: output stream
1405  * 
1406  * Clears the pending flag on @stream.
1407  **/
1408 void
1409 g_output_stream_clear_pending (GOutputStream *stream)
1410 {
1411   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1412   
1413   stream->priv->pending = FALSE;
1414 }
1415
1416 /**
1417  * g_output_stream_async_write_is_via_threads:
1418  * @stream: a #GOutputStream.
1419  *
1420  * Checks if an ouput stream's write_async function uses threads.
1421  *
1422  * Returns: %TRUE if @stream's write_async function uses threads.
1423  **/
1424 gboolean
1425 g_output_stream_async_write_is_via_threads (GOutputStream *stream)
1426 {
1427   GOutputStreamClass *class;
1428
1429   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1430
1431   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1432
1433   return (class->write_async == g_output_stream_real_write_async &&
1434       !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
1435         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
1436 }
1437
1438
1439 /********************************************
1440  *   Default implementation of async ops    *
1441  ********************************************/
1442
1443 typedef struct {
1444   const void         *buffer;
1445   gsize               count_requested;
1446   gssize              count_written;
1447 } WriteData;
1448
1449 static void
1450 free_write_data (WriteData *op)
1451 {
1452   g_slice_free (WriteData, op);
1453 }
1454
1455 static void
1456 write_async_thread (GTask        *task,
1457                     gpointer      source_object,
1458                     gpointer      task_data,
1459                     GCancellable *cancellable)
1460 {
1461   GOutputStream *stream = source_object;
1462   WriteData *op = task_data;
1463   GOutputStreamClass *class;
1464   GError *error = NULL;
1465   gssize count_written;
1466
1467   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1468   count_written = class->write_fn (stream, op->buffer, op->count_requested,
1469                                    cancellable, &error);
1470   if (count_written == -1)
1471     g_task_return_error (task, error);
1472   else
1473     g_task_return_int (task, count_written);
1474 }
1475
1476 static void write_async_pollable (GPollableOutputStream *stream,
1477                                   GTask                 *task);
1478
1479 static gboolean
1480 write_async_pollable_ready (GPollableOutputStream *stream,
1481                             gpointer               user_data)
1482 {
1483   GTask *task = user_data;
1484
1485   write_async_pollable (stream, task);
1486   return FALSE;
1487 }
1488
1489 static void
1490 write_async_pollable (GPollableOutputStream *stream,
1491                       GTask                 *task)
1492 {
1493   GError *error = NULL;
1494   WriteData *op = g_task_get_task_data (task);
1495   gssize count_written;
1496
1497   if (g_task_return_error_if_cancelled (task))
1498     return;
1499
1500   count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
1501     write_nonblocking (stream, op->buffer, op->count_requested, &error);
1502
1503   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
1504     {
1505       GSource *source;
1506
1507       g_error_free (error);
1508
1509       source = g_pollable_output_stream_create_source (stream,
1510                                                        g_task_get_cancellable (task));
1511       g_task_attach_source (task, source,
1512                             (GSourceFunc) write_async_pollable_ready);
1513       g_source_unref (source);
1514       return;
1515     }
1516
1517   if (count_written == -1)
1518     g_task_return_error (task, error);
1519   else
1520     g_task_return_int (task, count_written);
1521 }
1522
1523 static void
1524 g_output_stream_real_write_async (GOutputStream       *stream,
1525                                   const void          *buffer,
1526                                   gsize                count,
1527                                   int                  io_priority,
1528                                   GCancellable        *cancellable,
1529                                   GAsyncReadyCallback  callback,
1530                                   gpointer             user_data)
1531 {
1532   GTask *task;
1533   WriteData *op;
1534
1535   op = g_slice_new0 (WriteData);
1536   task = g_task_new (stream, cancellable, callback, user_data);
1537   g_task_set_check_cancellable (task, FALSE);
1538   g_task_set_task_data (task, op, (GDestroyNotify) free_write_data);
1539   op->buffer = buffer;
1540   op->count_requested = count;
1541
1542   if (!g_output_stream_async_write_is_via_threads (stream))
1543     write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
1544   else
1545     g_task_run_in_thread (task, write_async_thread);
1546   g_object_unref (task);
1547 }
1548
1549 static gssize
1550 g_output_stream_real_write_finish (GOutputStream  *stream,
1551                                    GAsyncResult   *result,
1552                                    GError        **error)
1553 {
1554   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1555
1556   return g_task_propagate_int (G_TASK (result), error);
1557 }
1558
1559 typedef struct {
1560   GInputStream *source;
1561   GOutputStreamSpliceFlags flags;
1562 } SpliceData;
1563
1564 static void
1565 free_splice_data (SpliceData *op)
1566 {
1567   g_object_unref (op->source);
1568   g_free (op);
1569 }
1570
1571 static void
1572 splice_async_thread (GTask        *task,
1573                      gpointer      source_object,
1574                      gpointer      task_data,
1575                      GCancellable *cancellable)
1576 {
1577   GOutputStream *stream = source_object;
1578   SpliceData *op = task_data;
1579   GOutputStreamClass *class;
1580   GError *error = NULL;
1581   gssize bytes_copied;
1582
1583   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1584   
1585   bytes_copied = class->splice (stream,
1586                                 op->source,
1587                                 op->flags,
1588                                 cancellable,
1589                                 &error);
1590   if (bytes_copied == -1)
1591     g_task_return_error (task, error);
1592   else
1593     g_task_return_int (task, bytes_copied);
1594 }
1595
1596 static void
1597 g_output_stream_real_splice_async (GOutputStream             *stream,
1598                                    GInputStream              *source,
1599                                    GOutputStreamSpliceFlags   flags,
1600                                    int                        io_priority,
1601                                    GCancellable              *cancellable,
1602                                    GAsyncReadyCallback        callback,
1603                                    gpointer                   user_data)
1604 {
1605   GTask *task;
1606   SpliceData *op;
1607
1608   op = g_new0 (SpliceData, 1);
1609   task = g_task_new (stream, cancellable, callback, user_data);
1610   g_task_set_task_data (task, op, (GDestroyNotify)free_splice_data);
1611   op->flags = flags;
1612   op->source = g_object_ref (source);
1613
1614   /* TODO: In the case where both source and destintion have
1615      non-threadbased async calls we can use a true async copy here */
1616   
1617   g_task_run_in_thread (task, splice_async_thread);
1618   g_object_unref (task);
1619 }
1620
1621 static gssize
1622 g_output_stream_real_splice_finish (GOutputStream  *stream,
1623                                     GAsyncResult   *result,
1624                                     GError        **error)
1625 {
1626   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1627
1628   return g_task_propagate_int (G_TASK (result), error);
1629 }
1630
1631
1632 static void
1633 flush_async_thread (GTask        *task,
1634                     gpointer      source_object,
1635                     gpointer      task_data,
1636                     GCancellable *cancellable)
1637 {
1638   GOutputStream *stream = source_object;
1639   GOutputStreamClass *class;
1640   gboolean result;
1641   GError *error = NULL;
1642
1643   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1644   result = TRUE;
1645   if (class->flush)
1646     result = class->flush (stream, cancellable, &error);
1647
1648   if (result)
1649     g_task_return_boolean (task, TRUE);
1650   else
1651     g_task_return_error (task, error);
1652 }
1653
1654 static void
1655 g_output_stream_real_flush_async (GOutputStream       *stream,
1656                                   int                  io_priority,
1657                                   GCancellable        *cancellable,
1658                                   GAsyncReadyCallback  callback,
1659                                   gpointer             user_data)
1660 {
1661   GTask *task;
1662
1663   task = g_task_new (stream, cancellable, callback, user_data);
1664   g_task_set_priority (task, io_priority);
1665   g_task_run_in_thread (task, flush_async_thread);
1666   g_object_unref (task);
1667 }
1668
1669 static gboolean
1670 g_output_stream_real_flush_finish (GOutputStream  *stream,
1671                                    GAsyncResult   *result,
1672                                    GError        **error)
1673 {
1674   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1675
1676   return g_task_propagate_boolean (G_TASK (result), error);
1677 }
1678
1679 static void
1680 close_async_thread (GTask        *task,
1681                     gpointer      source_object,
1682                     gpointer      task_data,
1683                     GCancellable *cancellable)
1684 {
1685   GOutputStream *stream = source_object;
1686   GOutputStreamClass *class;
1687   GError *error = NULL;
1688   gboolean result = TRUE;
1689
1690   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1691
1692   /* Do a flush here if there is a flush function, and we did not have to do
1693    * an async flush before (see g_output_stream_close_async)
1694    */
1695   if (class->flush != NULL &&
1696       (class->flush_async == NULL ||
1697        class->flush_async == g_output_stream_real_flush_async))
1698     {
1699       result = class->flush (stream, cancellable, &error);
1700     }
1701
1702   /* Auto handling of cancelation disabled, and ignore
1703      cancellation, since we want to close things anyway, although
1704      possibly in a quick-n-dirty way. At least we never want to leak
1705      open handles */
1706
1707   if (class->close_fn)
1708     {
1709       /* Make sure to close, even if the flush failed (see sync close) */
1710       if (!result)
1711         class->close_fn (stream, cancellable, NULL);
1712       else
1713         result = class->close_fn (stream, cancellable, &error);
1714     }
1715
1716   if (result)
1717     g_task_return_boolean (task, TRUE);
1718   else
1719     g_task_return_error (task, error);
1720 }
1721
1722 static void
1723 g_output_stream_real_close_async (GOutputStream       *stream,
1724                                   int                  io_priority,
1725                                   GCancellable        *cancellable,
1726                                   GAsyncReadyCallback  callback,
1727                                   gpointer             user_data)
1728 {
1729   GTask *task;
1730
1731   task = g_task_new (stream, cancellable, callback, user_data);
1732   g_task_set_priority (task, io_priority);
1733   g_task_run_in_thread (task, close_async_thread);
1734   g_object_unref (task);
1735 }
1736
1737 static gboolean
1738 g_output_stream_real_close_finish (GOutputStream  *stream,
1739                                    GAsyncResult   *result,
1740                                    GError        **error)
1741 {
1742   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
1743
1744   return g_task_propagate_boolean (G_TASK (result), error);
1745 }