Added. Added. Added. Added.
[platform/upstream/glib.git] / gio / goutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Alexander Larsson <alexl@redhat.com>
21  */
22
23 #include <config.h>
24 #include "goutputstream.h"
25 #include "gsimpleasyncresult.h"
26 #include "glibintl.h"
27
28 #include "gioalias.h"
29
30 /**
31  * SECTION:goutputstream
32  * @short_description: base class for implementing streaming output
33  * 
34  * 
35  *
36  **/
37
38 G_DEFINE_TYPE (GOutputStream, g_output_stream, G_TYPE_OBJECT);
39
40 struct _GOutputStreamPrivate {
41   guint closed : 1;
42   guint pending : 1;
43   guint cancelled : 1;
44   GAsyncReadyCallback outstanding_callback;
45 };
46
47 static gssize   g_output_stream_real_splice        (GOutputStream             *stream,
48                                                     GInputStream              *source,
49                                                     GOutputStreamSpliceFlags   flags,
50                                                     GCancellable              *cancellable,
51                                                     GError                   **error);
52 static void     g_output_stream_real_write_async   (GOutputStream             *stream,
53                                                     const void                *buffer,
54                                                     gsize                      count,
55                                                     int                        io_priority,
56                                                     GCancellable              *cancellable,
57                                                     GAsyncReadyCallback        callback,
58                                                     gpointer                   data);
59 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
60                                                     GAsyncResult              *result,
61                                                     GError                   **error);
62 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
63                                                     GInputStream              *source,
64                                                     GOutputStreamSpliceFlags   flags,
65                                                     int                        io_priority,
66                                                     GCancellable              *cancellable,
67                                                     GAsyncReadyCallback        callback,
68                                                     gpointer                   data);
69 static gssize   g_output_stream_real_splice_finish (GOutputStream             *stream,
70                                                     GAsyncResult              *result,
71                                                     GError                   **error);
72 static void     g_output_stream_real_flush_async   (GOutputStream             *stream,
73                                                     int                        io_priority,
74                                                     GCancellable              *cancellable,
75                                                     GAsyncReadyCallback        callback,
76                                                     gpointer                   data);
77 static gboolean g_output_stream_real_flush_finish  (GOutputStream             *stream,
78                                                     GAsyncResult              *result,
79                                                     GError                   **error);
80 static void     g_output_stream_real_close_async   (GOutputStream             *stream,
81                                                     int                        io_priority,
82                                                     GCancellable              *cancellable,
83                                                     GAsyncReadyCallback        callback,
84                                                     gpointer                   data);
85 static gboolean g_output_stream_real_close_finish  (GOutputStream             *stream,
86                                                     GAsyncResult              *result,
87                                                     GError                   **error);
88
89 static void
90 g_output_stream_finalize (GObject *object)
91 {
92   GOutputStream *stream;
93
94   stream = G_OUTPUT_STREAM (object);
95   
96   if (G_OBJECT_CLASS (g_output_stream_parent_class)->finalize)
97     (*G_OBJECT_CLASS (g_output_stream_parent_class)->finalize) (object);
98 }
99
100 static void
101 g_output_stream_dispose (GObject *object)
102 {
103   GOutputStream *stream;
104
105   stream = G_OUTPUT_STREAM (object);
106   
107   if (!stream->priv->closed)
108     g_output_stream_close (stream, NULL, NULL);
109   
110   if (G_OBJECT_CLASS (g_output_stream_parent_class)->dispose)
111     (*G_OBJECT_CLASS (g_output_stream_parent_class)->dispose) (object);
112 }
113
114 static void
115 g_output_stream_class_init (GOutputStreamClass *klass)
116 {
117   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
118   
119   g_type_class_add_private (klass, sizeof (GOutputStreamPrivate));
120   
121   gobject_class->finalize = g_output_stream_finalize;
122   gobject_class->dispose = g_output_stream_dispose;
123
124   klass->splice = g_output_stream_real_splice;
125   
126   klass->write_async = g_output_stream_real_write_async;
127   klass->write_finish = g_output_stream_real_write_finish;
128   klass->splice_async = g_output_stream_real_splice_async;
129   klass->splice_finish = g_output_stream_real_splice_finish;
130   klass->flush_async = g_output_stream_real_flush_async;
131   klass->flush_finish = g_output_stream_real_flush_finish;
132   klass->close_async = g_output_stream_real_close_async;
133   klass->close_finish = g_output_stream_real_close_finish;
134 }
135
136 static void
137 g_output_stream_init (GOutputStream *stream)
138 {
139   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
140                                               G_TYPE_OUTPUT_STREAM,
141                                               GOutputStreamPrivate);
142 }
143
144 /**
145  * g_output_stream_write:
146  * @stream: a #GOutputStream.
147  * @buffer: the buffer containing the data to write. 
148  * @count: the number of bytes to write
149  * @cancellable: optional cancellable object
150  * @error: location to store the error occuring, or %NULL to ignore
151  *
152  * Tries to write @count bytes from @buffer into the stream. Will block
153  * during the operation.
154  * 
155  * If count is zero returns zero and does nothing. A value of @count
156  * larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
157  *
158  * On success, the number of bytes written to the stream is returned.
159  * It is not an error if this is not the same as the requested size, as it
160  * can happen e.g. on a partial i/o error, or if the there is not enough
161  * storage in the stream. All writes either block until at least one byte
162  * is written, so zero is never returned (unless @count is zero).
163  * 
164  * If @cancellable is not NULL, then the operation can be cancelled by
165  * triggering the cancellable object from another thread. If the operation
166  * was cancelled, the error G_IO_ERROR_CANCELLED will be returned. If an
167  * operation was partially finished when the operation was cancelled the
168  * partial result will be returned, without an error.
169  *
170  * On error -1 is returned and @error is set accordingly.
171  * 
172  * Return value: Number of bytes written, or -1 on error
173  **/
174 gssize
175 g_output_stream_write (GOutputStream *stream,
176                        const void    *buffer,
177                        gsize          count,
178                        GCancellable  *cancellable,
179                        GError       **error)
180 {
181   GOutputStreamClass *class;
182   gssize res;
183
184   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
185   g_return_val_if_fail (buffer != NULL, 0);
186
187   if (count == 0)
188     return 0;
189   
190   if (((gssize) count) < 0)
191     {
192       g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
193                    _("Too large count value passed to g_output_stream_write"));
194       return -1;
195     }
196
197   if (stream->priv->closed)
198     {
199       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
200                    _("Stream is already closed"));
201       return -1;
202     }
203   
204   if (stream->priv->pending)
205     {
206       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
207                    _("Stream has outstanding operation"));
208       return -1;
209     }
210   
211   class = G_OUTPUT_STREAM_GET_CLASS (stream);
212
213   if (class->write == NULL) 
214     {
215       g_set_error (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
216                    _("Output stream doesn't implement write"));
217       return -1;
218     }
219   
220   if (cancellable)
221     g_push_current_cancellable (cancellable);
222   
223   stream->priv->pending = TRUE;
224   res = class->write (stream, buffer, count, cancellable, error);
225   stream->priv->pending = FALSE;
226   
227   if (cancellable)
228     g_pop_current_cancellable (cancellable);
229   
230   return res; 
231 }
232
233 /**
234  * g_output_stream_write_all:
235  * @stream: a #GOutputStream.
236  * @buffer: the buffer containing the data to write. 
237  * @count: the number of bytes to write
238  * @bytes_written: location to store the number of bytes that was written to the stream
239  * @cancellable: optional #GCancellable object, %NULL to ignore.
240  * @error: location to store the error occuring, or %NULL to ignore
241  *
242  * Tries to write @count bytes from @buffer into the stream. Will block
243  * during the operation.
244  * 
245  * This function is similar to g_output_stream_write(), except it tries to
246  * write as many bytes as requested, only stopping on an error.
247  *
248  * On a successful write of @count bytes, %TRUE is returned, and @bytes_written
249  * is set to @count.
250  * 
251  * If there is an error during the operation FALSE is returned and @error
252  * is set to indicate the error status, @bytes_written is updated to contain
253  * the number of bytes written into the stream before the error occured.
254  *
255  * Return value: %TRUE on success, %FALSE if there was an error
256  **/
257 gboolean
258 g_output_stream_write_all (GOutputStream *stream,
259                            const void    *buffer,
260                            gsize          count,
261                            gsize         *bytes_written,
262                            GCancellable  *cancellable,
263                            GError       **error)
264 {
265   gsize _bytes_written;
266   gssize res;
267
268   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
269   g_return_val_if_fail (buffer != NULL, FALSE);
270
271   _bytes_written = 0;
272   while (_bytes_written < count)
273     {
274       res = g_output_stream_write (stream, (char *)buffer + _bytes_written, count - _bytes_written,
275                                    cancellable, error);
276       if (res == -1)
277         {
278           if (bytes_written)
279             *bytes_written = _bytes_written;
280           return FALSE;
281         }
282       
283       if (res == 0)
284         g_warning ("Write returned zero without error");
285
286       _bytes_written += res;
287     }
288   
289   if (bytes_written)
290     *bytes_written = _bytes_written;
291   return TRUE;
292 }
293
294 /**
295  * g_output_stream_flush:
296  * @stream: a #GOutputStream.
297  * @cancellable: optional cancellable object
298  * @error: location to store the error occuring, or %NULL to ignore
299  *
300  * Flushed any outstanding buffers in the stream. Will block during the operation.
301  * Closing the stream will implicitly cause a flush.
302  *
303  * This function is optional for inherited classes.
304  * 
305  * If @cancellable is not %NULL, then the operation can be cancelled by
306  * triggering the cancellable object from another thread. If the operation
307  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
308  *
309  * Return value: %TRUE on success, %FALSE on error
310  **/
311 gboolean
312 g_output_stream_flush (GOutputStream    *stream,
313                        GCancellable  *cancellable,
314                        GError          **error)
315 {
316   GOutputStreamClass *class;
317   gboolean res;
318
319   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
320
321   if (stream->priv->closed)
322     {
323       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
324                    _("Stream is already closed"));
325       return FALSE;
326     }
327
328   if (stream->priv->pending)
329     {
330       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
331                    _("Stream has outstanding operation"));
332       return FALSE;
333     }
334   
335   class = G_OUTPUT_STREAM_GET_CLASS (stream);
336
337   res = TRUE;
338   if (class->flush)
339     {
340       if (cancellable)
341         g_push_current_cancellable (cancellable);
342       
343       stream->priv->pending = TRUE;
344       res = class->flush (stream, cancellable, error);
345       stream->priv->pending = FALSE;
346       
347       if (cancellable)
348         g_pop_current_cancellable (cancellable);
349     }
350   
351   return res;
352 }
353
354 /**
355  * g_output_stream_splice:
356  * @stream: a #GOutputStream.
357  * @source: a #GInputStream.
358  * @flags: a set of #GOutputStreamSpliceFlags.
359  * @cancellable: optional #GCancellable object, %NULL to ignore. 
360  * @error: a #GError location to store the error occuring, or %NULL to 
361  * ignore.
362  *
363  * Splices an input stream into an output stream.
364  *
365  * Returns: a #gssize containig the size of the data spliced.
366  **/
367 gssize
368 g_output_stream_splice (GOutputStream *stream,
369                         GInputStream *source,
370                         GOutputStreamSpliceFlags flags,
371                         GCancellable  *cancellable,
372                         GError **error)
373 {
374   GOutputStreamClass *class;
375   gboolean res;
376
377   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
378   g_return_val_if_fail (G_IS_INPUT_STREAM (source), -1);
379
380   if (stream->priv->closed)
381     {
382       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
383                    _("Target stream is already closed"));
384       return -1;
385     }
386
387   if (g_input_stream_is_closed (source))
388     {
389       g_set_error (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
390                    _("Source stream is already closed"));
391       return -1;
392     }
393
394   if (stream->priv->pending)
395     {
396       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
397                    _("Stream has outstanding operation"));
398       return -1;
399     }
400   
401   class = G_OUTPUT_STREAM_GET_CLASS (stream);
402
403   res = TRUE;
404   if (cancellable)
405     g_push_current_cancellable (cancellable);
406       
407   stream->priv->pending = TRUE;
408   res = class->splice (stream, source, flags, cancellable, error);
409   stream->priv->pending = FALSE;
410       
411   if (cancellable)
412     g_pop_current_cancellable (cancellable);
413   
414   return res;
415 }
416
417 static gssize
418 g_output_stream_real_splice (GOutputStream *stream,
419                              GInputStream *source,
420                              GOutputStreamSpliceFlags flags,
421                              GCancellable  *cancellable,
422                              GError **error)
423 {
424   gssize n_read, n_written;
425   gssize bytes_copied;
426   char buffer[8192], *p;
427   gboolean res;
428
429   bytes_copied = 0;
430   res = TRUE;
431   do 
432     {
433       n_read = g_input_stream_read (source, buffer, sizeof (buffer), cancellable, error);
434       if (n_read == -1)
435         {
436           res = FALSE;
437           break;
438         }
439         
440       if (n_read == 0)
441         break;
442
443       p = buffer;
444       while (n_read > 0)
445         {
446           stream->priv->pending = FALSE;
447           n_written = g_output_stream_write (stream, p, n_read, cancellable, error);
448           stream->priv->pending = TRUE;
449           if (n_written == -1)
450             {
451               res = FALSE;
452               break;
453             }
454
455           p += n_written;
456           n_read -= n_written;
457           bytes_copied += n_written;
458         }
459     }
460   while (res);
461
462   if (!res)
463     error = NULL; /* Ignore further errors */
464
465   if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_SOURCE)
466     {
467       /* Don't care about errors in source here */
468       g_input_stream_close (source, cancellable, NULL);
469     }
470
471   if (flags & G_OUTPUT_STREAM_SPLICE_FLAGS_CLOSE_TARGET)
472     {
473       /* But write errors on close are bad! */
474       stream->priv->pending = FALSE;
475       if (!g_output_stream_close (stream, cancellable, error))
476         res = FALSE;
477       stream->priv->pending = TRUE;
478     }
479
480   if (res)
481     return bytes_copied;
482   
483   return -1;
484 }
485
486
487 /**
488  * g_output_stream_close:
489  * @stream: A #GOutputStream.
490  * @cancellable: optional cancellable object
491  * @error: location to store the error occuring, or %NULL to ignore
492  *
493  * Closes the stream, releasing resources related to it.
494  *
495  * Once the stream is closed, all other operations will return %G_IO_ERROR_CLOSED.
496  * Closing a stream multiple times will not return an error.
497  *
498  * Closing a stream will automatically flush any outstanding buffers in the
499  * stream.
500  *
501  * Streams will be automatically closed when the last reference
502  * is dropped, but you might want to call make sure resources
503  * are released as early as possible.
504  *
505  * Some streams might keep the backing store of the stream (e.g. a file descriptor)
506  * open after the stream is closed. See the documentation for the individual
507  * stream for details.
508  *
509  * On failure the first error that happened will be reported, but the close
510  * operation will finish as much as possible. A stream that failed to
511  * close will still return %G_IO_ERROR_CLOSED all operations. Still, it
512  * is important to check and report the error to the user, otherwise
513  * there might be a loss of data as all data might not be written.
514  * 
515  * If @cancellable is not NULL, then the operation can be cancelled by
516  * triggering the cancellable object from another thread. If the operation
517  * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned.
518  * Cancelling a close will still leave the stream closed, but there some streams
519  * can use a faster close that doesn't block to e.g. check errors. On
520  * cancellation (as with any error) there is no guarantee that all written
521  * data will reach the target. 
522  *
523  * Return value: %TRUE on success, %FALSE on failure
524  **/
525 gboolean
526 g_output_stream_close (GOutputStream  *stream,
527                        GCancellable   *cancellable,
528                        GError        **error)
529 {
530   GOutputStreamClass *class;
531   gboolean res;
532
533   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
534
535   class = G_OUTPUT_STREAM_GET_CLASS (stream);
536
537   if (stream->priv->closed)
538     return TRUE;
539
540   if (stream->priv->pending)
541     {
542       g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
543                    _("Stream has outstanding operation"));
544       return FALSE;
545     }
546
547   res = g_output_stream_flush (stream, cancellable, error);
548
549   stream->priv->pending = TRUE;
550   
551   if (cancellable)
552     g_push_current_cancellable (cancellable);
553
554   if (!res)
555     {
556       /* flushing caused the error that we want to return,
557        * but we still want to close the underlying stream if possible
558        */
559       if (class->close)
560         class->close (stream, cancellable, NULL);
561     }
562   else
563     {
564       res = TRUE;
565       if (class->close)
566         res = class->close (stream, cancellable, error);
567     }
568   
569   if (cancellable)
570     g_pop_current_cancellable (cancellable);
571   
572   stream->priv->closed = TRUE;
573   stream->priv->pending = FALSE;
574   
575   return res;
576 }
577
578 static void
579 async_ready_callback_wrapper (GObject *source_object,
580                               GAsyncResult *res,
581                               gpointer      user_data)
582 {
583   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
584
585   stream->priv->pending = FALSE;
586   if (stream->priv->outstanding_callback)
587     (*stream->priv->outstanding_callback) (source_object, res, user_data);
588   g_object_unref (stream);
589 }
590
591 static void
592 async_ready_close_callback_wrapper (GObject *source_object,
593                                     GAsyncResult *res,
594                                     gpointer user_data)
595 {
596   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
597
598   stream->priv->pending = FALSE;
599   stream->priv->closed = TRUE;
600   if (stream->priv->outstanding_callback)
601     (*stream->priv->outstanding_callback) (source_object, res, user_data);
602   g_object_unref (stream);
603 }
604
605 /**
606  * g_output_stream_write_async:
607  * @stream: A #GOutputStream.
608  * @buffer: the buffer containing the data to write. 
609  * @count: the number of bytes to write
610  * @io_priority: the io priority of the request. the io priority of the request
611  * @cancellable: optional #GCancellable object, %NULL to ignore.
612  * @callback: callback to call when the request is satisfied
613  * @user_data: the data to pass to callback function
614  *
615  * Request an asynchronous write of @count bytes from @buffer into the stream.
616  * When the operation is finished @callback will be called, giving the results.
617  *
618  * During an async request no other sync and async calls are allowed, and will
619  * result in %G_IO_ERROR_PENDING errors. 
620  *
621  * A value of @count larger than %G_MAXSSIZE will cause a %G_IO_ERROR_INVALID_ARGUMENT error.
622  *
623  * On success, the number of bytes written will be passed to the
624  * @callback. It is not an error if this is not the same as the requested size, as it
625  * can happen e.g. on a partial i/o error, but generally we try to write
626  * as many bytes as requested. 
627  *
628  * Any outstanding i/o request with higher priority (lower numerical value) will
629  * be executed before an outstanding request with lower priority. Default
630  * priority is %G_PRIORITY_DEFAULT.
631  *
632  * The asyncronous methods have a default fallback that uses threads to implement
633  * asynchronicity, so they are optional for inheriting classes. However, if you
634  * override one you must override all.
635  *
636  * For the synchronous, blocking version of this function, see g_output_stream_write().
637  **/
638 void
639 g_output_stream_write_async (GOutputStream       *stream,
640                              const void          *buffer,
641                              gsize                count,
642                              int                  io_priority,
643                              GCancellable        *cancellable,
644                              GAsyncReadyCallback  callback,
645                              gpointer             user_data)
646 {
647   GOutputStreamClass *class;
648   GSimpleAsyncResult *simple;
649
650   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
651   g_return_if_fail (buffer != NULL);
652
653   if (count == 0)
654     {
655       simple = g_simple_async_result_new (G_OBJECT (stream),
656                                           callback,
657                                           user_data,
658                                           g_output_stream_write_async);
659       g_simple_async_result_complete_in_idle (simple);
660       g_object_unref (simple);
661       return;
662     }
663
664   if (((gssize) count) < 0)
665     {
666       g_simple_async_report_error_in_idle (G_OBJECT (stream),
667                                            callback,
668                                            user_data,
669                                            G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
670                                            _("Too large count value passed to g_output_stream_write_async"));
671       return;
672     }
673
674   if (stream->priv->closed)
675     {
676       g_simple_async_report_error_in_idle (G_OBJECT (stream),
677                                            callback,
678                                            user_data,
679                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
680                                            _("Stream is already closed"));
681       return;
682     }
683   
684   if (stream->priv->pending)
685     {
686       g_simple_async_report_error_in_idle (G_OBJECT (stream),
687                                            callback,
688                                            user_data,
689                                            G_IO_ERROR, G_IO_ERROR_PENDING,
690                                            _("Stream has outstanding operation"));
691       return;
692     }
693
694   class = G_OUTPUT_STREAM_GET_CLASS (stream);
695
696   stream->priv->pending = TRUE;
697   stream->priv->outstanding_callback = callback;
698   g_object_ref (stream);
699   class->write_async (stream, buffer, count, io_priority, cancellable,
700                       async_ready_callback_wrapper, user_data);
701 }
702
703 /**
704  * g_output_stream_write_finish:
705  * @stream: a #GOutputStream.
706  * @result: a #GAsyncResult.
707  * @error: a #GError location to store the error occuring, or %NULL to 
708  * ignore.
709  * 
710  * Finishes a stream write operation.
711  * 
712  * Returns: a #gssize containing the number of bytes written to the stream.
713  **/
714 gssize
715 g_output_stream_write_finish (GOutputStream *stream,
716                               GAsyncResult *result,
717                               GError **error)
718 {
719   GSimpleAsyncResult *simple;
720   GOutputStreamClass *class;
721
722   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
723   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
724
725   if (G_IS_SIMPLE_ASYNC_RESULT (result))
726     {
727       simple = G_SIMPLE_ASYNC_RESULT (result);
728       if (g_simple_async_result_propagate_error (simple, error))
729         return -1;
730
731       /* Special case writes of 0 bytes */
732       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_write_async)
733         return 0;
734     }
735   
736   class = G_OUTPUT_STREAM_GET_CLASS (stream);
737   return class->write_finish (stream, result, error);
738 }
739
740 typedef struct {
741   GInputStream *source;
742   gpointer user_data;
743   GAsyncReadyCallback callback;
744 } SpliceUserData;
745
746 static void
747 async_ready_splice_callback_wrapper (GObject *source_object,
748                                      GAsyncResult *res,
749                                      gpointer _data)
750 {
751   GOutputStream *stream = G_OUTPUT_STREAM (source_object);
752   SpliceUserData *data = _data;
753   
754   stream->priv->pending = FALSE;
755   
756   if (data->callback)
757     (*data->callback) (source_object, res, data->user_data);
758   
759   g_object_unref (stream);
760   g_object_unref (data->source);
761   g_free (data);
762 }
763
764 /**
765  * g_output_stream_splice_async:
766  * @stream: a #GOutputStream.
767  * @source: a #GInputStream. 
768  * @flags: a set of #GOutputStreamSpliceFlags.
769  * @io_priority: the io priority of the request.
770  * @cancellable: optional #GCancellable object, %NULL to ignore. 
771  * @callback: a #GAsyncReadyCallback. 
772  * @user_data: user data passed to @callback.
773  * 
774  * Splices a stream asynchronously.
775  * 
776  **/
777 void
778 g_output_stream_splice_async (GOutputStream             *stream,
779                               GInputStream              *source,
780                               GOutputStreamSpliceFlags   flags,
781                               int                        io_priority,
782                               GCancellable              *cancellable,
783                               GAsyncReadyCallback        callback,
784                               gpointer                   user_data)
785 {
786   GOutputStreamClass *class;
787   SpliceUserData *data;
788
789   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
790   g_return_if_fail (G_IS_INPUT_STREAM (source));
791
792   if (stream->priv->closed)
793     {
794       g_simple_async_report_error_in_idle (G_OBJECT (stream),
795                                            callback,
796                                            user_data,
797                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
798                                            _("Target stream is already closed"));
799       return;
800     }
801
802   if (g_input_stream_is_closed (source))
803     {
804       g_simple_async_report_error_in_idle (G_OBJECT (stream),
805                                            callback,
806                                            user_data,
807                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
808                                            _("Source stream is already closed"));
809       return;
810     }
811   
812   if (stream->priv->pending)
813     {
814       g_simple_async_report_error_in_idle (G_OBJECT (stream),
815                                            callback,
816                                            user_data,
817                                            G_IO_ERROR, G_IO_ERROR_PENDING,
818                                            _("Stream has outstanding operation"));
819       return;
820     }
821
822   class = G_OUTPUT_STREAM_GET_CLASS (stream);
823
824   stream->priv->pending = TRUE;
825
826   data = g_new0 (SpliceUserData, 1);
827   data->callback = callback;
828   data->user_data = user_data;
829   data->source = g_object_ref (source);
830   
831   g_object_ref (stream);
832   class->splice_async (stream, source, flags, io_priority, cancellable,
833                       async_ready_splice_callback_wrapper, data);
834 }
835
836 /**
837  * g_output_stream_splice_finish:
838  * @stream: a #GOutputStream.
839  * @result: a #GAsyncResult.
840  * @error: a #GError location to store the error occuring, or %NULL to 
841  * ignore.
842  *
843  * Finishes an asynchronous stream splice operation.
844  * 
845  * Returns: a #gssize of the number of bytes spliced.
846  **/
847 gssize
848 g_output_stream_splice_finish (GOutputStream             *stream,
849                                GAsyncResult              *result,
850                                GError                   **error)
851 {
852   GSimpleAsyncResult *simple;
853   GOutputStreamClass *class;
854
855   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), -1);
856   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), -1);
857
858   if (G_IS_SIMPLE_ASYNC_RESULT (result))
859     {
860       simple = G_SIMPLE_ASYNC_RESULT (result);
861       if (g_simple_async_result_propagate_error (simple, error))
862         return -1;
863     }
864   
865   class = G_OUTPUT_STREAM_GET_CLASS (stream);
866   return class->splice_finish (stream, result, error);
867 }
868
869 /**
870  * g_output_stream_flush_async:
871  * @stream: a #GOutputStream.
872  * @io_priority: the io priority of the request.
873  * @cancellable: optional #GCancellable object, %NULL to ignore.
874  * @callback: a #GAsyncReadyCallback to call when the request is satisfied
875  * @user_data: the data to pass to callback function
876  * 
877  * Flushes a stream asynchronously.
878  * 
879  **/
880 void
881 g_output_stream_flush_async (GOutputStream       *stream,
882                              int                  io_priority,
883                              GCancellable        *cancellable,
884                              GAsyncReadyCallback  callback,
885                              gpointer             user_data)
886 {
887   GOutputStreamClass *class;
888   GSimpleAsyncResult *simple;
889
890   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
891
892   if (stream->priv->closed)
893     {
894       g_simple_async_report_error_in_idle (G_OBJECT (stream),
895                                            callback,
896                                            user_data,
897                                            G_IO_ERROR, G_IO_ERROR_CLOSED,
898                                            _("Stream is already closed"));
899       return;
900     }
901   
902   if (stream->priv->pending)
903     {
904       g_simple_async_report_error_in_idle (G_OBJECT (stream),
905                                            callback,
906                                            user_data,
907                                            G_IO_ERROR, G_IO_ERROR_PENDING,
908                                            _("Stream has outstanding operation"));
909       return;
910     }
911
912   class = G_OUTPUT_STREAM_GET_CLASS (stream);
913   
914   if (class->flush_async == NULL)
915     {
916       simple = g_simple_async_result_new (G_OBJECT (stream),
917                                           callback,
918                                           user_data,
919                                           g_output_stream_flush_async);
920       g_simple_async_result_complete_in_idle (simple);
921       g_object_unref (simple);
922       return;
923     }
924       
925   stream->priv->pending = TRUE;
926   stream->priv->outstanding_callback = callback;
927   g_object_ref (stream);
928   class->flush_async (stream, io_priority, cancellable,
929                       async_ready_callback_wrapper, user_data);
930 }
931
932 /**
933  * g_output_stream_flush_finish:
934  * @stream: a #GOutputStream.
935  * @result: a GAsyncResult.
936  * @error: a #GError location to store the error occuring, or %NULL to 
937  * ignore.
938  * 
939  * Finishes flushing an output stream.
940  * 
941  * Returns: %TRUE if flush operation suceeded, %FALSE otherwise.
942  **/
943 gboolean
944 g_output_stream_flush_finish (GOutputStream *stream,
945                               GAsyncResult *result,
946                               GError **error)
947 {
948   GSimpleAsyncResult *simple;
949   GOutputStreamClass *klass;
950
951   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
952   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
953
954   if (G_IS_SIMPLE_ASYNC_RESULT (result))
955     {
956       simple = G_SIMPLE_ASYNC_RESULT (result);
957       if (g_simple_async_result_propagate_error (simple, error))
958         return FALSE;
959
960       /* Special case default implementation */
961       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_flush_async)
962         return TRUE;
963     }
964
965   klass = G_OUTPUT_STREAM_GET_CLASS (stream);
966   return klass->flush_finish (stream, result, error);
967 }
968
969
970 /**
971  * g_output_stream_close_async:
972  * @stream: A #GOutputStream.
973  * @io_priority: the io priority of the request.
974  * @callback: callback to call when the request is satisfied
975  * @user_data: the data to pass to callback function
976  * @cancellable: optional cancellable object
977  *
978  * Requests an asynchronous close of the stream, releasing resources related to it.
979  * When the operation is finished @callback will be called, giving the results.
980  *
981  * For behaviour details see g_output_stream_close().
982  *
983  * The asyncronous methods have a default fallback that uses threads to implement
984  * asynchronicity, so they are optional for inheriting classes. However, if you
985  * override one you must override all.
986  **/
987 void
988 g_output_stream_close_async (GOutputStream      *stream,
989                              int                 io_priority,
990                              GCancellable       *cancellable,
991                              GAsyncReadyCallback callback,
992                              gpointer            user_data)
993 {
994   GOutputStreamClass *class;
995   GSimpleAsyncResult *simple;
996
997   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
998   
999   if (stream->priv->closed)
1000     {
1001       simple = g_simple_async_result_new (G_OBJECT (stream),
1002                                           callback,
1003                                           user_data,
1004                                           g_output_stream_close_async);
1005       g_simple_async_result_complete_in_idle (simple);
1006       g_object_unref (simple);
1007       return;
1008     }
1009
1010   if (stream->priv->pending)
1011     {
1012       g_simple_async_report_error_in_idle (G_OBJECT (stream),
1013                                            callback,
1014                                            user_data,
1015                                            G_IO_ERROR, G_IO_ERROR_PENDING,
1016                                            _("Stream has outstanding operation"));
1017       return;
1018     }
1019   
1020   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1021   stream->priv->pending = TRUE;
1022   stream->priv->outstanding_callback = callback;
1023   g_object_ref (stream);
1024   class->close_async (stream, io_priority, cancellable,
1025                       async_ready_close_callback_wrapper, user_data);
1026 }
1027
1028 /**
1029  * g_output_stream_close_finish:
1030  * @stream: a #GOutputStream.
1031  * @result: a #GAsyncResult.
1032  * @error: a #GError location to store the error occuring, or %NULL to 
1033  * ignore.
1034  * 
1035  * Closes an output stream.
1036  * 
1037  * Returns: %TRUE if stream was successfully closed, %FALSE otherwise.
1038  **/
1039 gboolean
1040 g_output_stream_close_finish (GOutputStream *stream,
1041                               GAsyncResult *result,
1042                               GError **error)
1043 {
1044   GSimpleAsyncResult *simple;
1045   GOutputStreamClass *class;
1046
1047   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1048   g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
1049
1050   if (G_IS_SIMPLE_ASYNC_RESULT (result))
1051     {
1052       simple = G_SIMPLE_ASYNC_RESULT (result);
1053       if (g_simple_async_result_propagate_error (simple, error))
1054         return FALSE;
1055
1056       /* Special case already closed */
1057       if (g_simple_async_result_get_source_tag (simple) == g_output_stream_close_async)
1058         return TRUE;
1059     }
1060
1061   class = G_OUTPUT_STREAM_GET_CLASS (stream);
1062   return class->close_finish (stream, result, error);
1063 }
1064
1065 /**
1066  * g_output_stream_is_closed:
1067  * @stream: a #GOutputStream.
1068  * 
1069  * Checks if an output stream has already been closed.
1070  * 
1071  * Returns: %TRUE if @stream is closed. %FALSE otherwise. 
1072  **/
1073 gboolean
1074 g_output_stream_is_closed (GOutputStream *stream)
1075 {
1076   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), TRUE);
1077   
1078   return stream->priv->closed;
1079 }
1080
1081 /**
1082  * g_output_stream_has_pending:
1083  * @stream: a #GOutputStream.
1084  * 
1085  * Checks if an ouput stream has pending actions.
1086  * 
1087  * Returns: %TRUE if @stream has pending actions. 
1088  **/
1089 gboolean
1090 g_output_stream_has_pending (GOutputStream *stream)
1091 {
1092   g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
1093   
1094   return stream->priv->pending;
1095 }
1096
1097 /**
1098  * g_output_stream_set_pending:
1099  * @stream: a #GOutputStream.
1100  * @pending: a #gboolean.
1101  * 
1102  * Sets the @stream as having pending actions if @pending is %TRUE. 
1103  **/
1104 void
1105 g_output_stream_set_pending (GOutputStream              *stream,
1106                             gboolean                   pending)
1107 {
1108   g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
1109   
1110   stream->priv->pending = pending;
1111 }
1112
1113
1114 /********************************************
1115  *   Default implementation of async ops    *
1116  ********************************************/
1117
1118 typedef struct {
1119   const void         *buffer;
1120   gsize               count_requested;
1121   gssize              count_written;
1122 } WriteData;
1123
1124 static void
1125 write_async_thread (GSimpleAsyncResult *res,
1126                    GObject *object,
1127                    GCancellable *cancellable)
1128 {
1129   WriteData *op;
1130   GOutputStreamClass *class;
1131   GError *error = NULL;
1132
1133   class = G_OUTPUT_STREAM_GET_CLASS (object);
1134   op = g_simple_async_result_get_op_res_gpointer (res);
1135   op->count_written = class->write (G_OUTPUT_STREAM (object), op->buffer, op->count_requested,
1136                                     cancellable, &error);
1137   if (op->count_written == -1)
1138     {
1139       g_simple_async_result_set_from_error (res, error);
1140       g_error_free (error);
1141     }
1142 }
1143
1144 static void
1145 g_output_stream_real_write_async (GOutputStream       *stream,
1146                                   const void          *buffer,
1147                                   gsize                count,
1148                                   int                  io_priority,
1149                                   GCancellable        *cancellable,
1150                                   GAsyncReadyCallback  callback,
1151                                   gpointer             user_data)
1152 {
1153   GSimpleAsyncResult *res;
1154   WriteData *op;
1155
1156   op = g_new0 (WriteData, 1);
1157   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1158   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1159   op->buffer = buffer;
1160   op->count_requested = count;
1161   
1162   g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
1163   g_object_unref (res);
1164 }
1165
1166 static gssize
1167 g_output_stream_real_write_finish (GOutputStream *stream,
1168                                    GAsyncResult *result,
1169                                    GError **error)
1170 {
1171   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1172   WriteData *op;
1173
1174   g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_write_async);
1175   op = g_simple_async_result_get_op_res_gpointer (simple);
1176   return op->count_written;
1177 }
1178
1179 typedef struct {
1180   GInputStream *source;
1181   GOutputStreamSpliceFlags flags;
1182   gssize bytes_copied;
1183 } SpliceData;
1184
1185 static void
1186 splice_async_thread (GSimpleAsyncResult *result,
1187                      GObject *object,
1188                      GCancellable *cancellable)
1189 {
1190   SpliceData *op;
1191   GOutputStreamClass *class;
1192   GError *error = NULL;
1193   GOutputStream *stream;
1194
1195   stream = G_OUTPUT_STREAM (object);
1196   class = G_OUTPUT_STREAM_GET_CLASS (object);
1197   op = g_simple_async_result_get_op_res_gpointer (result);
1198   
1199   stream->priv->pending = FALSE;
1200   op->bytes_copied =
1201     g_output_stream_splice (stream,
1202                             op->source,
1203                             op->flags,
1204                             cancellable,
1205                             &error);
1206   stream->priv->pending = TRUE;
1207
1208   if (op->bytes_copied == -1)
1209     {
1210       g_simple_async_result_set_from_error (result, error);
1211       g_error_free (error);
1212     }
1213 }
1214
1215 static void
1216 g_output_stream_real_splice_async  (GOutputStream             *stream,
1217                                     GInputStream              *source,
1218                                     GOutputStreamSpliceFlags   flags,
1219                                     int                        io_priority,
1220                                     GCancellable              *cancellable,
1221                                     GAsyncReadyCallback        callback,
1222                                     gpointer                   user_data)
1223 {
1224   GSimpleAsyncResult *res;
1225   SpliceData *op;
1226
1227   op = g_new0 (SpliceData, 1);
1228   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_splice_async);
1229   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
1230   op->flags = flags;
1231   op->source = source;
1232
1233   /* TODO: In the case where both source and destintion have
1234      non-threadbased async calls we can use a true async copy here */
1235   
1236   g_simple_async_result_run_in_thread (res, splice_async_thread, io_priority, cancellable);
1237   g_object_unref (res);
1238 }
1239
1240 static gssize
1241 g_output_stream_real_splice_finish (GOutputStream             *stream,
1242                                     GAsyncResult              *result,
1243                                     GError                   **error)
1244 {
1245   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1246   SpliceData *op;
1247
1248   g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_splice_async);
1249   op = g_simple_async_result_get_op_res_gpointer (simple);
1250   return op->bytes_copied;
1251 }
1252
1253
1254
1255 static void
1256 flush_async_thread (GSimpleAsyncResult *res,
1257                     GObject *object,
1258                     GCancellable *cancellable)
1259 {
1260   GOutputStreamClass *class;
1261   gboolean result;
1262   GError *error = NULL;
1263
1264   class = G_OUTPUT_STREAM_GET_CLASS (object);
1265   result = TRUE;
1266   if (class->flush)
1267     result = class->flush (G_OUTPUT_STREAM (object), cancellable, &error);
1268
1269   if (!result)
1270     {
1271       g_simple_async_result_set_from_error (res, error);
1272       g_error_free (error);
1273     }
1274 }
1275
1276 static void
1277 g_output_stream_real_flush_async (GOutputStream       *stream,
1278                                   int                  io_priority,
1279                                   GCancellable        *cancellable,
1280                                   GAsyncReadyCallback  callback,
1281                                   gpointer             user_data)
1282 {
1283   GSimpleAsyncResult *res;
1284
1285   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_write_async);
1286   
1287   g_simple_async_result_run_in_thread (res, flush_async_thread, io_priority, cancellable);
1288   g_object_unref (res);
1289 }
1290
1291 static gboolean
1292 g_output_stream_real_flush_finish (GOutputStream *stream,
1293                                    GAsyncResult *result,
1294                                    GError **error)
1295 {
1296   return TRUE;
1297 }
1298
1299 static void
1300 close_async_thread (GSimpleAsyncResult *res,
1301                     GObject *object,
1302                     GCancellable *cancellable)
1303 {
1304   GOutputStreamClass *class;
1305   GError *error = NULL;
1306   gboolean result;
1307
1308   /* Auto handling of cancelation disabled, and ignore
1309      cancellation, since we want to close things anyway, although
1310      possibly in a quick-n-dirty way. At least we never want to leak
1311      open handles */
1312   
1313   class = G_OUTPUT_STREAM_GET_CLASS (object);
1314   result = class->close (G_OUTPUT_STREAM (object), cancellable, &error);
1315   if (!result)
1316     {
1317       g_simple_async_result_set_from_error (res, error);
1318       g_error_free (error);
1319     }
1320 }
1321
1322 static void
1323 g_output_stream_real_close_async (GOutputStream      *stream,
1324                                   int                 io_priority,
1325                                   GCancellable       *cancellable,
1326                                   GAsyncReadyCallback callback,
1327                                   gpointer            user_data)
1328 {
1329   GSimpleAsyncResult *res;
1330   
1331   res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data, g_output_stream_real_close_async);
1332
1333   g_simple_async_result_set_handle_cancellation (res, FALSE);
1334   
1335   g_simple_async_result_run_in_thread (res, close_async_thread, io_priority, cancellable);
1336   g_object_unref (res);
1337 }
1338
1339 static gboolean
1340 g_output_stream_real_close_finish (GOutputStream              *stream,
1341                                    GAsyncResult              *result,
1342                                    GError                   **error)
1343 {
1344   GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
1345   g_assert (g_simple_async_result_get_source_tag (simple) == g_output_stream_real_close_async);
1346   return TRUE;
1347 }
1348
1349 #define __G_OUTPUT_STREAM_C__
1350 #include "gioaliasdef.c"