Don't cheat and unset the "pending" flag around inner calls. Instead, call
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24 #include "goutputstream.h"
25 #include "gsimpleasyncresult.h"
26 #include "glibintl.h"
27
28 #include "gioalias.h"
29
30 /**
31  * SECTION:goutputstream
32  * @short_description: Base class for implementing streaming output
33  * 
34  * 
35  *
36  **/
37
38 G_DEFINE_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
39
40 struct _GOutputStreamPrivate {
41   guint closed : 1;
42   guint pending : 1;
43   guint cancelled : 1;
44   GAsyncReadyCallback outstanding_callback;
45 };
46
47 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
48                                                     GInputStream              *source,
49                                                     GOutputStreamSpliceFlags   flags,
50                                                     GCancellable              *cancellable,
51                                                     GError                   **error);
52 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
53                                                     const void                *buffer,
54                                                     gsize                      count,
55                                                     int                        io_priority,
56                                                     GCancellable              *cancellable,
57                                                     GAsyncReadyCallback        callback,
58                                                     gpointer                   data);
59 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
60                                                     GAsyncResult              *result,
61                                                     GError                   **error);
62 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
63                                                     GInputStream              *source,
64                                                     GOutputStreamSpliceFlags   flags,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
73                                                     int                        io_priority,
74                                                     GCancellable              *cancellable,
75                                                     GAsyncReadyCallback        callback,
76                                                     gpointer                   data);
77 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
78                                                     GAsyncResult              *result,
79                                                     GError                   **error);
80 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
81                                                     int                        io_priority,
82                                                     GCancellable              *cancellable,
83                                                     GAsyncReadyCallback        callback,
84                                                     gpointer                   data);
85 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
86                                                     GAsyncResult              *result,
87                                                     GError                   **error);
88
89 static void
90 g_output_stream_finalize (GObject *object)
91 {
92   GOutputStream *stream;
93
94   stream = G_OUTPUT_STREAM (object);
95   
96   if (G_OBJECT_CLASS (g_output_stream_parent_class)->finalize)
97     (*G_OBJECT_CLASS (g_output_stream_parent_class)->finalize) (object);
98 }
99
100 static void
101 g_output_stream_dispose (GObject *object)
102 {
103   GOutputStream *stream;
104
105   stream = G_OUTPUT_STREAM (object);
106   
107   if (!stream->priv->closed)
108     g_output_stream_close (stream, NULL, NULL);
109   
110   if (G_OBJECT_CLASS (g_output_stream_parent_class)->dispose)
111     (*G_OBJECT_CLASS (g_output_stream_parent_class)->dispose) (object);
112 }
113
114 static void
115 g_output_stream_class_init (GOutputStreamClass *klass)
116 {
117   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
118   
119   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
120   
121   gobject_class->finalize = g_output_stream_finalize;
122   gobject_class->dispose = g_output_stream_dispose;
123
124   klass->splice = g_output_stream_real_splice;
125   
126   klass->write_async = g_output_stream_real_write_async;
127   klass->write_finish = g_output_stream_real_write_finish;
128   klass->splice_async = g_output_stream_real_splice_async;
129   klass->splice_finish = g_output_stream_real_splice_finish;
130   klass->flush_async = g_output_stream_real_flush_async;
131   klass->flush_finish = g_output_stream_real_flush_finish;
132   klass->close_async = g_output_stream_real_close_async;
133   klass->close_finish = g_output_stream_real_close_finish;
134 }
135
136 static void
137 g_output_stream_init (GOutputStream *stream)
138 {
139   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
140                                               G_TYPE_OUTPUT_STREAM,
141                                               GOutputStreamPrivate);
142 }
143
144 /**
145  * g_output_stream_write:
146  * @stream: a #GOutputStream.
147  * @buffer: the buffer containing the data to write. 
148  * @count: the number of bytes to write
149  * @cancellable: optional cancellable object
150  * @error: location to store the error occuring, or %NULL to ignore
151  *
152  * Tries to write @count bytes from @buffer into the stream. Will block
153  * during the operation.
154  * 
155  * If count is zero returns zero and does nothing. A value of @count
156  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
157  *
158  * On success, the number of bytes written to the stream is returned.
159  * It is not an error if this is not the same as the requested size, as it
160  * can happen e.g. on a partial i/o error, or if the there is not enough
161  * storage in the stream. All writes either block until at least one byte
162  * is written, so zero is never returned (unless @count is zero).
163  * 
164  * If @cancellable is not NULL, then the operation can be cancelled by
165  * triggering the cancellable object from another thread. If the operation
166  * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an
167  * operation was partially finished when the operation was cancelled the
168  * partial result will be returned, without an error.
169  *
170  * On error -1 is returned and @error is set accordingly.
171  * 
172  * Return value: Number of bytes written, or -1 on error
173  **/
174 gssize
175 g_output_stream_write (GOutputStream  *stream,
176                        const void     *buffer,
177                        gsize           count,
178                        GCancellable   *cancellable,
179                        GError        **error)
180 {
181   GOutputStreamClass *class;
182   gssize res;
183
184   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
185   g_return_val_if_fail (buffer != NULL, 0);
186
187   if (count == 0)
188     return 0;
189   
190   if (((gssize) count) < 0)
191     {
192       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
193                    _("Too large count value passed to g_output_stream_write"));
194       return -1;
195     }
196
197   if (stream->priv->closed)
198     {
199       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
200                    _("Stream is already closed"));
201       return -1;
202     }
203   
204   if (stream->priv->pending)
205     {
206       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
207                    _("Stream has outstanding operation"));
208       return -1;
209     }
210   
211   class = G_OUTPUT_STREAM_GET_CLASS (stream);
212
213   if (class->write == NULL) 
214     {
215       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
216                    _("Output stream doesn't implement write"));
217       return -1;
218     }
219   
220   if (cancellable)
221     g_push_current_cancellable (cancellable);
222   
223   stream->priv->pending = TRUE;
224   res = class->write (stream, buffer, count, cancellable, error);
225   stream->priv->pending = FALSE;
226   
227   if (cancellable)
228     g_pop_current_cancellable (cancellable);
229   
230   return res; 
231 }
232
233 /**
234  * g_output_stream_write_all:
235  * @stream: a #GOutputStream.
236  * @buffer: the buffer containing the data to write. 
237  * @count: the number of bytes to write
238  * @bytes_written: location to store the number of bytes that was 
239  *     written to the stream
240  * @cancellable: optional #GCancellable object, %NULL to ignore.
241  * @error: location to store the error occuring, or %NULL to ignore
242  *
243  * Tries to write @count bytes from @buffer into the stream. Will block
244  * during the operation.
245  * 
246  * This function is similar to g_output_stream_write(), except it tries to
247  * write as many bytes as requested, only stopping on an error.
248  *
249  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
250  * is set to @count.
251  * 
252  * If there is an error during the operation FALSE is returned and @error
253  * is set to indicate the error status, @bytes_written is updated to contain
254  * the number of bytes written into the stream before the error occured.
255  *
256  * Return value: %TRUE on success, %FALSE if there was an error
257  **/
258 gboolean
259 g_output_stream_write_all (GOutputStream  *stream,
260                            const void     *buffer,
261                            gsize           count,
262                            gsize          *bytes_written,
263                            GCancellable   *cancellable,
264                            GError        **error)
265 {
266   gsize _bytes_written;
267   gssize res;
268
269   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
270   g_return_val_if_fail (buffer != NULL, FALSE);
271
272   _bytes_written = 0;
273   while (_bytes_written < count)
274     {
275       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
276                                    cancellable, error);
277       if (res == -1)
278         {
279           if (bytes_written)
280             *bytes_written = _bytes_written;
281           return FALSE;
282         }
283       
284       if (res == 0)
285         g_warning ("Write returned zero without error");
286
287       _bytes_written += res;
288     }
289   
290   if (bytes_written)
291     *bytes_written = _bytes_written;
292
293   return TRUE;
294 }
295
296 /**
297  * g_output_stream_flush:
298  * @stream: a #GOutputStream.
299  * @cancellable: optional cancellable object
300  * @error: location to store the error occuring, or %NULL to ignore
301  *
302  * Flushed any outstanding buffers in the stream. Will block during 
303  * the operation. Closing the stream will implicitly cause a flush.
304  *
305  * This function is optional for inherited classes.
306  * 
307  * If @cancellable is not %NULL, then the operation can be cancelled by
308  * triggering the cancellable object from another thread. If the operation
309  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
310  *
311  * Return value: %TRUE on success, %FALSE on error
312  **/
313 gboolean
314 g_output_stream_flush (GOutputStream  *stream,
315                        GCancellable   *cancellable,
316                        GError        **error)
317 {
318   GOutputStreamClass *class;
319   gboolean res;
320
321   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
322
323   if (stream->priv->closed)
324     {
325       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
326                    _("Stream is already closed"));
327       return FALSE;
328     }
329
330   if (stream->priv->pending)
331     {
332       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
333                    _("Stream has outstanding operation"));
334       return FALSE;
335     }
336   
337   class = G_OUTPUT_STREAM_GET_CLASS (stream);
338
339   res = TRUE;
340   if (class->flush)
341     {
342       if (cancellable)
343         g_push_current_cancellable (cancellable);
344       
345       stream->priv->pending = TRUE;
346       res = class->flush (stream, cancellable, error);
347       stream->priv->pending = FALSE;
348       
349       if (cancellable)
350         g_pop_current_cancellable (cancellable);
351     }
352   
353   return res;
354 }
355
356 /**
357  * g_output_stream_splice:
358  * @stream: a #GOutputStream.
359  * @source: a #GInputStream.
360  * @flags: a set of #GOutputStreamSpliceFlags.
361  * @cancellable: optional #GCancellable object, %NULL to ignore. 
362  * @error: a #GError location to store the error occuring, or %NULL to 
363  * ignore.
364  *
365  * Splices an input stream into an output stream.
366  *
367  * Returns: a #gssize containig the size of the data spliced.
368  **/
369 gssize
370 g_output_stream_splice (GOutputStream             *stream,
371                         GInputStream              *source,
372                         GOutputStreamSpliceFlags   flags,
373                         GCancellable              *cancellable,
374                         GError                   **error)
375 {
376   GOutputStreamClass *class;
377   gboolean res;
378
379   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
380   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
381
382   if (stream->priv->closed)
383     {
384       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
385                    _("Target stream is already closed"));
386       return -1;
387     }
388
389   if (g_input_stream_is_closed (source))
390     {
391       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
392                    _("Source stream is already closed"));
393       return -1;
394     }
395
396   if (stream->priv->pending)
397     {
398       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
399                    _("Stream has outstanding operation"));
400       return -1;
401     }
402   
403   class = G_OUTPUT_STREAM_GET_CLASS (stream);
404
405   res = TRUE;
406   if (cancellable)
407     g_push_current_cancellable (cancellable);
408       
409   stream->priv->pending = TRUE;
410   res = class->splice (stream, source, flags, cancellable, error);
411   stream->priv->pending = FALSE;
412       
413   if (cancellable)
414     g_pop_current_cancellable (cancellable);
415   
416   return res;
417 }
418
419 static gssize
420 g_output_stream_real_splice (GOutputStream             *stream,
421                              GInputStream              *source,
422                              GOutputStreamSpliceFlags   flags,
423                              GCancellable              *cancellable,
424                              GError                   **error)
425 {
426   GOutputStreamClass *class = G_OUTPUT_STREAM_GET_CLASS (stream);
427   gssize n_read, n_written;
428   gssize bytes_copied;
429   char buffer[8192], *p;
430   gboolean res;
431
432   if (class->write == NULL) 
433     {
434       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
435                    _("Output stream doesn't implement write"));
436       res = FALSE;
437       goto notsupported;
438     }
439   
440   bytes_copied = 0;
441   res = TRUE;
442   do 
443     {
444       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
445       if (n_read == -1)
446         {
447           res = FALSE;
448           break;
449         }
450         
451       if (n_read == 0)
452         break;
453
454       p = buffer;
455       while (n_read > 0)
456         {
457           n_written = class->write (stream, p, n_read, cancellable, error);
458           if (n_written == -1)
459             {
460               res = FALSE;
461               break;
462             }
463
464           p += n_written;
465           n_read -= n_written;
466           bytes_copied += n_written;
467         }
468     }
469   while (res);
470
471  notsupported:
472   if (!res)
473     error = NULL; /* Ignore further errors */
474
475   if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_SOURCE)
476     {
477       /* Don't care about errors in source here */
478       g_input_stream_close (source, cancellable, NULL);
479     }
480
481   if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_TARGET)
482     {
483       /* But write errors on close are bad! */
484       if (!class->close (stream, cancellable, error))
485         res = FALSE;
486     }
487
488   if (res)
489     return bytes_copied;
490   
491   return -1;
492 }
493
494
495 /**
496  * g_output_stream_close:
497  * @stream: A #GOutputStream.
498  * @cancellable: optional cancellable object
499  * @error: location to store the error occuring, or %NULL to ignore
500  *
501  * Closes the stream, releasing resources related to it.
502  *
503  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
504  * Closing a stream multiple times will not return an error.
505  *
506  * Closing a stream will automatically flush any outstanding buffers in the
507  * stream.
508  *
509  * Streams will be automatically closed when the last reference
510  * is dropped, but you might want to call make sure resources
511  * are released as early as possible.
512  *
513  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
514  * open after the stream is closed. See the documentation for the individual
515  * stream for details.
516  *
517  * On failure the first error that happened will be reported, but the close
518  * operation will finish as much as possible. A stream that failed to
519  * close will still return %G_IO_ERROR_CLOSED all operations. Still, it
520  * is important to check and report the error to the user, otherwise
521  * there might be a loss of data as all data might not be written.
522  * 
523  * If @cancellable is not NULL, then the operation can be cancelled by
524  * triggering the cancellable object from another thread. If the operation
525  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
526  * Cancelling a close will still leave the stream closed, but there some streams
527  * can use a faster close that doesn't block to e.g. check errors. On
528  * cancellation (as with any error) there is no guarantee that all written
529  * data will reach the target. 
530  *
531  * Return value: %TRUE on success, %FALSE on failure
532  **/
533 gboolean
534 g_output_stream_close (GOutputStream  *stream,
535                        GCancellable   *cancellable,
536                        GError        **error)
537 {
538   GOutputStreamClass *class;
539   gboolean res;
540
541   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
542
543   class = G_OUTPUT_STREAM_GET_CLASS (stream);
544
545   if (stream->priv->closed)
546     return TRUE;
547
548   if (stream->priv->pending)
549     {
550       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
551                    _("Stream has outstanding operation"));
552       return FALSE;
553     }
554
555   stream->priv->pending = TRUE;
556   
557   if (cancellable)
558     g_push_current_cancellable (cancellable);
559
560   res = class->flush (stream, cancellable, error);
561
562   if (!res)
563     {
564       /* flushing caused the error that we want to return,
565        * but we still want to close the underlying stream if possible
566        */
567       if (class->close)
568         class->close (stream, cancellable, NULL);
569     }
570   else
571     {
572       res = TRUE;
573       if (class->close)
574         res = class->close (stream, cancellable, error);
575     }
576   
577   if (cancellable)
578     g_pop_current_cancellable (cancellable);
579   
580   stream->priv->closed = TRUE;
581   stream->priv->pending = FALSE;
582   
583   return res;
584 }
585
586 static void
587 async_ready_callback_wrapper (GObject      *source_object,
588                               GAsyncResult *res,
589                               gpointer      user_data)
590 {
591   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
592
593   stream->priv->pending = FALSE;
594   if (stream->priv->outstanding_callback)
595     (*stream->priv->outstanding_callback) (source_object, res, user_data);
596   g_object_unref (stream);
597 }
598
599 static void
600 async_ready_close_callback_wrapper (GObject      *source_object,
601                                     GAsyncResult *res,
602                                     gpointer      user_data)
603 {
604   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
605
606   stream->priv->pending = FALSE;
607   stream->priv->closed = TRUE;
608   if (stream->priv->outstanding_callback)
609     (*stream->priv->outstanding_callback) (source_object, res, user_data);
610   g_object_unref (stream);
611 }
612
613 /**
614  * g_output_stream_write_async:
615  * @stream: A #GOutputStream.
616  * @buffer: the buffer containing the data to write. 
617  * @count: the number of bytes to write
618  * @io_priority: the io priority of the request.
619  * @cancellable: optional #GCancellable object, %NULL to ignore.
620  * @callback: callback to call when the request is satisfied
621  * @user_data: the data to pass to callback function
622  *
623  * Request an asynchronous write of @count bytes from @buffer into 
624  * the stream. When the operation is finished @callback will be called, 
625  * giving the results.
626  *
627  * During an async request no other sync and async calls are allowed, 
628  * and will result in %G_IO_ERROR_PENDING errors. 
629  *
630  * A value of @count larger than %G_MAXSSIZE will cause a 
631  * %G_IO_ERROR_INVALID_ARGUMENT error.
632  *
633  * On success, the number of bytes written will be passed to the
634  * @callback. It is not an error if this is not the same as the 
635  * requested size, as it can happen e.g. on a partial I/O error, 
636  * but generally we try to write as many bytes as requested. 
637  *
638  * Any outstanding I/O request with higher priority (lower numerical 
639  * value) will be executed before an outstanding request with lower 
640  * priority. Default priority is %G_PRIORITY_DEFAULT.
641  *
642  * The asyncronous methods have a default fallback that uses threads 
643  * to implement asynchronicity, so they are optional for inheriting 
644  * classes. However, if you override one you must override all.
645  *
646  * For the synchronous, blocking version of this function, see 
647  * g_output_stream_write().
648  **/
649 void
650 g_output_stream_write_async (GOutputStream       *stream,
651                              const void          *buffer,
652                              gsize                count,
653                              int                  io_priority,
654                              GCancellable        *cancellable,
655                              GAsyncReadyCallback  callback,
656                              gpointer             user_data)
657 {
658   GOutputStreamClass *class;
659   GSimpleAsyncResult *simple;
660
661   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
662   g_return_if_fail (buffer != NULL);
663
664   if (count == 0)
665     {
666       simple = g_simple_async_result_new (G_OBJECT (stream),
667                                           callback,
668                                           user_data,
669                                           g_output_stream_write_async);
670       g_simple_async_result_complete_in_idle (simple);
671       g_object_unref (simple);
672       return;
673     }
674
675   if (((gssize) count) < 0)
676     {
677       g_simple_async_report_error_in_idle (G_OBJECT (stream),
678                                            callback,
679                                            user_data,
680                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
681                                            _("Too large count value passed to g_output_stream_write_async"));
682       return;
683     }
684
685   if (stream->priv->closed)
686     {
687       g_simple_async_report_error_in_idle (G_OBJECT (stream),
688                                            callback,
689                                            user_data,
690                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
691                                            _("Stream is already closed"));
692       return;
693     }
694   
695   if (stream->priv->pending)
696     {
697       g_simple_async_report_error_in_idle (G_OBJECT (stream),
698                                            callback,
699                                            user_data,
700                                            G_IO_ERROR, G_IO_ERROR_PENDING,
701                                            _("Stream has outstanding operation"));
702       return;
703     }
704
705   class = G_OUTPUT_STREAM_GET_CLASS (stream);
706
707   stream->priv->pending = TRUE;
708   stream->priv->outstanding_callback = callback;
709   g_object_ref (stream);
710   class->write_async (stream, buffer, count, io_priority, cancellable,
711                       async_ready_callback_wrapper, user_data);
712 }
713
714 /**
715  * g_output_stream_write_finish:
716  * @stream: a #GOutputStream.
717  * @result: a #GAsyncResult.
718  * @error: a #GError location to store the error occuring, or %NULL to 
719  * ignore.
720  * 
721  * Finishes a stream write operation.
722  * 
723  * Returns: a #gssize containing the number of bytes written to the stream.
724  **/
725 gssize
726 g_output_stream_write_finish (GOutputStream  *stream,
727                               GAsyncResult   *result,
728                               GError        **error)
729 {
730   GSimpleAsyncResult *simple;
731   GOutputStreamClass *class;
732
733   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
734   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
735
736   if (G_IS_SIMPLE_ASYNC_RESULT (result))
737     {
738       simple = G_SIMPLE_ASYNC_RESULT (result);
739       if (g_simple_async_result_propagate_error (simple, error))
740         return -1;
741
742       /* Special case writes of 0 bytes */
743       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async)
744         return 0;
745     }
746   
747   class = G_OUTPUT_STREAM_GET_CLASS (stream);
748   return class->write_finish (stream, result, error);
749 }
750
751 typedef struct {
752   GInputStream *source;
753   gpointer user_data;
754   GAsyncReadyCallback callback;
755 } SpliceUserData;
756
757 static void
758 async_ready_splice_callback_wrapper (GObject      *source_object,
759                                      GAsyncResult *res,
760                                      gpointer     _data)
761 {
762   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
763   SpliceUserData *data = _data;
764   
765   stream->priv->pending = FALSE;
766   
767   if (data->callback)
768     (*data->callback) (source_object, res, data->user_data);
769   
770   g_object_unref (stream);
771   g_object_unref (data->source);
772   g_free (data);
773 }
774
775 /**
776  * g_output_stream_splice_async:
777  * @stream: a #GOutputStream.
778  * @source: a #GInputStream. 
779  * @flags: a set of #GOutputStreamSpliceFlags.
780  * @io_priority: the io priority of the request.
781  * @cancellable: optional #GCancellable object, %NULL to ignore. 
782  * @callback: a #GAsyncReadyCallback. 
783  * @user_data: user data passed to @callback.
784  * 
785  * Splices a stream asynchronously.
786  * 
787  **/
788 void
789 g_output_stream_splice_async (GOutputStream            *stream,
790                               GInputStream             *source,
791                               GOutputStreamSpliceFlags  flags,
792                               int                       io_priority,
793                               GCancellable             *cancellable,
794                               GAsyncReadyCallback       callback,
795                               gpointer                  user_data)
796 {
797   GOutputStreamClass *class;
798   SpliceUserData *data;
799
800   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
801   g_return_if_fail (G_IS_INPUT_STREAM (source));
802
803   if (stream->priv->closed)
804     {
805       g_simple_async_report_error_in_idle (G_OBJECT (stream),
806                                            callback,
807                                            user_data,
808                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
809                                            _("Target stream is already closed"));
810       return;
811     }
812
813   if (g_input_stream_is_closed (source))
814     {
815       g_simple_async_report_error_in_idle (G_OBJECT (stream),
816                                            callback,
817                                            user_data,
818                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
819                                            _("Source stream is already closed"));
820       return;
821     }
822   
823   if (stream->priv->pending)
824     {
825       g_simple_async_report_error_in_idle (G_OBJECT (stream),
826                                            callback,
827                                            user_data,
828                                            G_IO_ERROR, G_IO_ERROR_PENDING,
829                                            _("Stream has outstanding operation"));
830       return;
831     }
832
833   class = G_OUTPUT_STREAM_GET_CLASS (stream);
834
835   stream->priv->pending = TRUE;
836
837   data = g_new0 (SpliceUserData, 1);
838   data->callback = callback;
839   data->user_data = user_data;
840   data->source = g_object_ref (source);
841   
842   g_object_ref (stream);
843   class->splice_async (stream, source, flags, io_priority, cancellable,
844                       async_ready_splice_callback_wrapper, data);
845 }
846
847 /**
848  * g_output_stream_splice_finish:
849  * @stream: a #GOutputStream.
850  * @result: a #GAsyncResult.
851  * @error: a #GError location to store the error occuring, or %NULL to 
852  * ignore.
853  *
854  * Finishes an asynchronous stream splice operation.
855  * 
856  * Returns: a #gssize of the number of bytes spliced.
857  **/
858 gssize
859 g_output_stream_splice_finish (GOutputStream  *stream,
860                                GAsyncResult   *result,
861                                GError        **error)
862 {
863   GSimpleAsyncResult *simple;
864   GOutputStreamClass *class;
865
866   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
867   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
868
869   if (G_IS_SIMPLE_ASYNC_RESULT (result))
870     {
871       simple = G_SIMPLE_ASYNC_RESULT (result);
872       if (g_simple_async_result_propagate_error (simple, error))
873         return -1;
874     }
875   
876   class = G_OUTPUT_STREAM_GET_CLASS (stream);
877   return class->splice_finish (stream, result, error);
878 }
879
880 /**
881  * g_output_stream_flush_async:
882  * @stream: a #GOutputStream.
883  * @io_priority: the io priority of the request.
884  * @cancellable: optional #GCancellable object, %NULL to ignore.
885  * @callback: a #GAsyncReadyCallback to call when the request is satisfied
886  * @user_data: the data to pass to callback function
887  * 
888  * Flushes a stream asynchronously.
889  * 
890  **/
891 void
892 g_output_stream_flush_async (GOutputStream       *stream,
893                              int                  io_priority,
894                              GCancellable        *cancellable,
895                              GAsyncReadyCallback  callback,
896                              gpointer             user_data)
897 {
898   GOutputStreamClass *class;
899   GSimpleAsyncResult *simple;
900
901   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
902
903   if (stream->priv->closed)
904     {
905       g_simple_async_report_error_in_idle (G_OBJECT (stream),
906                                            callback,
907                                            user_data,
908                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
909                                            _("Stream is already closed"));
910       return;
911     }
912   
913   if (stream->priv->pending)
914     {
915       g_simple_async_report_error_in_idle (G_OBJECT (stream),
916                                            callback,
917                                            user_data,
918                                            G_IO_ERROR, G_IO_ERROR_PENDING,
919                                            _("Stream has outstanding operation"));
920       return;
921     }
922
923   class = G_OUTPUT_STREAM_GET_CLASS (stream);
924   
925   if (class->flush_async == NULL)
926     {
927       simple = g_simple_async_result_new (G_OBJECT (stream),
928                                           callback,
929                                           user_data,
930                                           g_output_stream_flush_async);
931       g_simple_async_result_complete_in_idle (simple);
932       g_object_unref (simple);
933       return;
934     }
935       
936   stream->priv->pending = TRUE;
937   stream->priv->outstanding_callback = callback;
938   g_object_ref (stream);
939   class->flush_async (stream, io_priority, cancellable,
940                       async_ready_callback_wrapper, user_data);
941 }
942
943 /**
944  * g_output_stream_flush_finish:
945  * @stream: a #GOutputStream.
946  * @result: a GAsyncResult.
947  * @error: a #GError location to store the error occuring, or %NULL to 
948  * ignore.
949  * 
950  * Finishes flushing an output stream.
951  * 
952  * Returns: %TRUE if flush operation suceeded, %FALSE otherwise.
953  **/
954 gboolean
955 g_output_stream_flush_finish (GOutputStream  *stream,
956                               GAsyncResult   *result,
957                               GError        **error)
958 {
959   GSimpleAsyncResult *simple;
960   GOutputStreamClass *klass;
961
962   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
963   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
964
965   if (G_IS_SIMPLE_ASYNC_RESULT (result))
966     {
967       simple = G_SIMPLE_ASYNC_RESULT (result);
968       if (g_simple_async_result_propagate_error (simple, error))
969         return FALSE;
970
971       /* Special case default implementation */
972       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async)
973         return TRUE;
974     }
975
976   klass = G_OUTPUT_STREAM_GET_CLASS (stream);
977   return klass->flush_finish (stream, result, error);
978 }
979
980
981 /**
982  * g_output_stream_close_async:
983  * @stream: A #GOutputStream.
984  * @io_priority: the io priority of the request.
985  * @callback: callback to call when the request is satisfied
986  * @user_data: the data to pass to callback function
987  * @cancellable: optional cancellable object
988  *
989  * Requests an asynchronous close of the stream, releasing resources 
990  * related to it. When the operation is finished @callback will be 
991  * called, giving the results.
992  *
993  * For behaviour details see g_output_stream_close().
994  *
995  * The asyncronous methods have a default fallback that uses threads 
996  * to implement asynchronicity, so they are optional for inheriting 
997  * classes. However, if you override one you must override all.
998  **/
999 void
1000 g_output_stream_close_async (GOutputStream       *stream,
1001                              int                  io_priority,
1002                              GCancellable        *cancellable,
1003                              GAsyncReadyCallback  callback,
1004                              gpointer             user_data)
1005 {
1006   GOutputStreamClass *class;
1007   GSimpleAsyncResult *simple;
1008
1009   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1010   
1011   if (stream->priv->closed)
1012     {
1013       simple = g_simple_async_result_new (G_OBJECT (stream),
1014                                           callback,
1015                                           user_data,
1016                                           g_output_stream_close_async);
1017       g_simple_async_result_complete_in_idle (simple);
1018       g_object_unref (simple);
1019       return;
1020     }
1021
1022   if (stream->priv->pending)
1023     {
1024       g_simple_async_report_error_in_idle (G_OBJECT (stream),
1025                                            callback,
1026                                            user_data,
1027                                            G_IO_ERROR, G_IO_ERROR_PENDING,
1028                                            _("Stream has outstanding operation"));
1029       return;
1030     }
1031   
1032   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1033   stream->priv->pending = TRUE;
1034   stream->priv->outstanding_callback = callback;
1035   g_object_ref (stream);
1036   class->close_async (stream, io_priority, cancellable,
1037                       async_ready_close_callback_wrapper, user_data);
1038 }
1039
1040 /**
1041  * g_output_stream_close_finish:
1042  * @stream: a #GOutputStream.
1043  * @result: a #GAsyncResult.
1044  * @error: a #GError location to store the error occuring, or %NULL to 
1045  * ignore.
1046  * 
1047  * Closes an output stream.
1048  * 
1049  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1050  **/
1051 gboolean
1052 g_output_stream_close_finish (GOutputStream  *stream,
1053                               GAsyncResult   *result,
1054                               GError        **error)
1055 {
1056   GSimpleAsyncResult *simple;
1057   GOutputStreamClass *class;
1058
1059   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1060   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1061
1062   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1063     {
1064       simple = G_SIMPLE_ASYNC_RESULT (result);
1065       if (g_simple_async_result_propagate_error (simple, error))
1066         return FALSE;
1067
1068       /* Special case already closed */
1069       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async)
1070         return TRUE;
1071     }
1072
1073   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1074   return class->close_finish (stream, result, error);
1075 }
1076
1077 /**
1078  * g_output_stream_is_closed:
1079  * @stream: a #GOutputStream.
1080  * 
1081  * Checks if an output stream has already been closed.
1082  * 
1083  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1084  **/
1085 gboolean
1086 g_output_stream_is_closed (GOutputStream *stream)
1087 {
1088   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1089   
1090   return stream->priv->closed;
1091 }
1092
1093 /**
1094  * g_output_stream_has_pending:
1095  * @stream: a #GOutputStream.
1096  * 
1097  * Checks if an ouput stream has pending actions.
1098  * 
1099  * Returns: %TRUE if @stream has pending actions. 
1100  **/
1101 gboolean
1102 g_output_stream_has_pending (GOutputStream *stream)
1103 {
1104   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1105   
1106   return stream->priv->pending;
1107 }
1108
1109 /**
1110  * g_output_stream_set_pending:
1111  * @stream: a #GOutputStream.
1112  * @pending: a #gboolean.
1113  * 
1114  * Sets the @stream as having pending actions if @pending is %TRUE. 
1115  **/
1116 void
1117 g_output_stream_set_pending (GOutputStream *stream,
1118                             gboolean        pending)
1119 {
1120   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1121   
1122   stream->priv->pending = pending;
1123 }
1124
1125
1126 /********************************************
1127  *   Default implementation of async ops    *
1128  ********************************************/
1129
1130 typedef struct {
1131   const void         *buffer;
1132   gsize               count_requested;
1133   gssize              count_written;
1134 } WriteData;
1135
1136 static void
1137 write_async_thread (GSimpleAsyncResult *res,
1138                     GObject            *object,
1139                     GCancellable       *cancellable)
1140 {
1141   WriteData *op;
1142   GOutputStreamClass *class;
1143   GError *error = NULL;
1144
1145   class = G_OUTPUT_STREAM_GET_CLASS (object);
1146   op = g_simple_async_result_get_op_res_gpointer (res);
1147   op->count_written = class->write (G_OUTPUT_STREAM (object), op->buffer, op->count_requested,
1148                                     cancellable, &error);
1149   if (op->count_written == -1)
1150     {
1151       g_simple_async_result_set_from_error (res, error);
1152       g_error_free (error);
1153     }
1154 }
1155
1156 static void
1157 g_output_stream_real_write_async (GOutputStream       *stream,
1158                                   const void          *buffer,
1159                                   gsize                count,
1160                                   int                  io_priority,
1161                                   GCancellable        *cancellable,
1162                                   GAsyncReadyCallback  callback,
1163                                   gpointer             user_data)
1164 {
1165   GSimpleAsyncResult *res;
1166   WriteData *op;
1167
1168   op = g_new0 (WriteData, 1);
1169   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1170   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1171   op->buffer = buffer;
1172   op->count_requested = count;
1173   
1174   g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
1175   g_object_unref (res);
1176 }
1177
1178 static gssize
1179 g_output_stream_real_write_finish (GOutputStream  *stream,
1180                                    GAsyncResult   *result,
1181                                    GError        **error)
1182 {
1183   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1184   WriteData *op;
1185
1186   g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async);
1187   op = g_simple_async_result_get_op_res_gpointer (simple);
1188   return op->count_written;
1189 }
1190
1191 typedef struct {
1192   GInputStream *source;
1193   GOutputStreamSpliceFlags flags;
1194   gssize bytes_copied;
1195 } SpliceData;
1196
1197 static void
1198 splice_async_thread (GSimpleAsyncResult *result,
1199                      GObject            *object,
1200                      GCancellable       *cancellable)
1201 {
1202   SpliceData *op;
1203   GOutputStreamClass *class;
1204   GError *error = NULL;
1205   GOutputStream *stream;
1206
1207   stream = G_OUTPUT_STREAM (object);
1208   class = G_OUTPUT_STREAM_GET_CLASS (object);
1209   op = g_simple_async_result_get_op_res_gpointer (result);
1210   
1211   op->bytes_copied = class->splice (stream,
1212                                     op->source,
1213                                     op->flags,
1214                                     cancellable,
1215                                     &error);
1216
1217   if (op->bytes_copied == -1)
1218     {
1219       g_simple_async_result_set_from_error (result, error);
1220       g_error_free (error);
1221     }
1222 }
1223
1224 static void
1225 g_output_stream_real_splice_async (GOutputStream             *stream,
1226                                    GInputStream              *source,
1227                                    GOutputStreamSpliceFlags   flags,
1228                                    int                        io_priority,
1229                                    GCancellable              *cancellable,
1230                                    GAsyncReadyCallback        callback,
1231                                    gpointer                   user_data)
1232 {
1233   GSimpleAsyncResult *res;
1234   SpliceData *op;
1235
1236   op = g_new0 (SpliceData, 1);
1237   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async);
1238   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1239   op->flags = flags;
1240   op->source = source;
1241
1242   /* TODO: In the case where both source and destintion have
1243      non-threadbased async calls we can use a true async copy here */
1244   
1245   g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable);
1246   g_object_unref (res);
1247 }
1248
1249 static gssize
1250 g_output_stream_real_splice_finish (GOutputStream  *stream,
1251                                     GAsyncResult   *result,
1252                                     GError        **error)
1253 {
1254   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1255   SpliceData *op;
1256
1257   g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async);
1258   op = g_simple_async_result_get_op_res_gpointer (simple);
1259   return op->bytes_copied;
1260 }
1261
1262
1263 static void
1264 flush_async_thread (GSimpleAsyncResult *res,
1265                     GObject            *object,
1266                     GCancellable       *cancellable)
1267 {
1268   GOutputStreamClass *class;
1269   gboolean result;
1270   GError *error = NULL;
1271
1272   class = G_OUTPUT_STREAM_GET_CLASS (object);
1273   result = TRUE;
1274   if (class->flush)
1275     result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1276
1277   if (!result)
1278     {
1279       g_simple_async_result_set_from_error (res, error);
1280       g_error_free (error);
1281     }
1282 }
1283
1284 static void
1285 g_output_stream_real_flush_async (GOutputStream       *stream,
1286                                   int                  io_priority,
1287                                   GCancellable        *cancellable,
1288                                   GAsyncReadyCallback  callback,
1289                                   gpointer             user_data)
1290 {
1291   GSimpleAsyncResult *res;
1292
1293   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1294   
1295   g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable);
1296   g_object_unref (res);
1297 }
1298
1299 static gboolean
1300 g_output_stream_real_flush_finish (GOutputStream  *stream,
1301                                    GAsyncResult   *result,
1302                                    GError        **error)
1303 {
1304   return TRUE;
1305 }
1306
1307 static void
1308 close_async_thread (GSimpleAsyncResult *res,
1309                     GObject            *object,
1310                     GCancellable       *cancellable)
1311 {
1312   GOutputStreamClass *class;
1313   GError *error = NULL;
1314   gboolean result;
1315
1316   /* Auto handling of cancelation disabled, and ignore
1317      cancellation, since we want to close things anyway, although
1318      possibly in a quick-n-dirty way. At least we never want to leak
1319      open handles */
1320   
1321   class = G_OUTPUT_STREAM_GET_CLASS (object);
1322   result = class->close (G_OUTPUT_STREAM (object), cancellable, &error);
1323   if (!result)
1324     {
1325       g_simple_async_result_set_from_error (res, error);
1326       g_error_free (error);
1327     }
1328 }
1329
1330 static void
1331 g_output_stream_real_close_async (GOutputStream       *stream,
1332                                   int                  io_priority,
1333                                   GCancellable        *cancellable,
1334                                   GAsyncReadyCallback  callback,
1335                                   gpointer             user_data)
1336 {
1337   GSimpleAsyncResult *res;
1338   
1339   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async);
1340
1341   g_simple_async_result_set_handle_cancellation (res, FALSE);
1342   
1343   g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable);
1344   g_object_unref (res);
1345 }
1346
1347 static gboolean
1348 g_output_stream_real_close_finish (GOutputStream  *stream,
1349                                    GAsyncResult   *result,
1350                                    GError        **error)
1351 {
1352   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1353   g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async);
1354   return TRUE;
1355 }
1356
1357 #define __G_OUTPUT_STREAM_C__
1358 #include "gioaliasdef.c"