Revert "GIOScheduler: Avoid constant iteration over pending job list"
[platform/upstream/glib.git] / gio / gbufferedoutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Christian Kellner <gicmo@gnome.org> 
21  */
22
23 #include "config.h"
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gseekable.h"
27 #include "gsimpleasyncresult.h"
28 #include "string.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31
32 /**
33  * SECTION:gbufferedoutputstream
34  * @short_description: Buffered Output Stream
35  * @include: gio/gio.h
36  * @see_also: #GFilterOutputStream, #GOutputStream
37  * 
38  * Buffered output stream implements #GFilterOutputStream and provides 
39  * for buffered writes. 
40  * 
41  * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
42  * 
43  * To create a buffered output stream, use g_buffered_output_stream_new(), 
44  * or g_buffered_output_stream_new_sized() to specify the buffer's size 
45  * at construction.
46  * 
47  * To get the size of a buffer within a buffered input stream, use 
48  * g_buffered_output_stream_get_buffer_size(). To change the size of a 
49  * buffered output stream's buffer, use 
50  * g_buffered_output_stream_set_buffer_size(). Note that the buffer's 
51  * size cannot be reduced below the size of the data within the buffer.
52  **/
53
54 #define DEFAULT_BUFFER_SIZE 4096
55
56 struct _GBufferedOutputStreamPrivate {
57   guint8 *buffer; 
58   gsize   len;
59   goffset pos;
60   gboolean auto_grow;
61 };
62
63 enum {
64   PROP_0,
65   PROP_BUFSIZE,
66   PROP_AUTO_GROW
67 };
68
69 static void     g_buffered_output_stream_set_property (GObject      *object,
70                                                        guint         prop_id,
71                                                        const GValue *value,
72                                                        GParamSpec   *pspec);
73
74 static void     g_buffered_output_stream_get_property (GObject    *object,
75                                                        guint       prop_id,
76                                                        GValue     *value,
77                                                        GParamSpec *pspec);
78 static void     g_buffered_output_stream_finalize     (GObject *object);
79
80
81 static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
82                                                        const void    *buffer,
83                                                        gsize          count,
84                                                        GCancellable  *cancellable,
85                                                        GError       **error);
86 static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
87                                                        GCancellable  *cancellable,
88                                                        GError          **error);
89 static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
90                                                        GCancellable   *cancellable,
91                                                        GError        **error);
92
93 static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
94                                                        int                   io_priority,
95                                                        GCancellable         *cancellable,
96                                                        GAsyncReadyCallback   callback,
97                                                        gpointer              data);
98 static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
99                                                        GAsyncResult         *result,
100                                                        GError              **error);
101 static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
102                                                        int                   io_priority,
103                                                        GCancellable         *cancellable,
104                                                        GAsyncReadyCallback   callback,
105                                                        gpointer              data);
106 static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
107                                                        GAsyncResult         *result,
108                                                        GError              **error);
109
110 static void     g_buffered_output_stream_seekable_iface_init (GSeekableIface  *iface);
111 static goffset  g_buffered_output_stream_tell                (GSeekable       *seekable);
112 static gboolean g_buffered_output_stream_can_seek            (GSeekable       *seekable);
113 static gboolean g_buffered_output_stream_seek                (GSeekable       *seekable,
114                                                               goffset          offset,
115                                                               GSeekType        type,
116                                                               GCancellable    *cancellable,
117                                                               GError         **error);
118 static gboolean g_buffered_output_stream_can_truncate        (GSeekable       *seekable);
119 static gboolean g_buffered_output_stream_truncate            (GSeekable       *seekable,
120                                                               goffset          offset,
121                                                               GCancellable    *cancellable,
122                                                               GError         **error);
123
124 G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream,
125                          g_buffered_output_stream,
126                          G_TYPE_FILTER_OUTPUT_STREAM,
127                          G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
128                                                 g_buffered_output_stream_seekable_iface_init))
129
130
131 static void
132 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
133 {
134   GObjectClass *object_class;
135   GOutputStreamClass *ostream_class;
136  
137   g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
138
139   object_class = G_OBJECT_CLASS (klass);
140   object_class->get_property = g_buffered_output_stream_get_property;
141   object_class->set_property = g_buffered_output_stream_set_property;
142   object_class->finalize     = g_buffered_output_stream_finalize;
143
144   ostream_class = G_OUTPUT_STREAM_CLASS (klass);
145   ostream_class->write_fn = g_buffered_output_stream_write;
146   ostream_class->flush = g_buffered_output_stream_flush;
147   ostream_class->close_fn = g_buffered_output_stream_close;
148   ostream_class->flush_async  = g_buffered_output_stream_flush_async;
149   ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
150   ostream_class->close_async  = g_buffered_output_stream_close_async;
151   ostream_class->close_finish = g_buffered_output_stream_close_finish;
152
153   g_object_class_install_property (object_class,
154                                    PROP_BUFSIZE,
155                                    g_param_spec_uint ("buffer-size",
156                                                       P_("Buffer Size"),
157                                                       P_("The size of the backend buffer"),
158                                                       1,
159                                                       G_MAXUINT,
160                                                       DEFAULT_BUFFER_SIZE,
161                                                       G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
162                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
163
164   g_object_class_install_property (object_class,
165                                    PROP_AUTO_GROW,
166                                    g_param_spec_boolean ("auto-grow",
167                                                          P_("Auto-grow"),
168                                                          P_("Whether the buffer should automatically grow"),
169                                                          FALSE,
170                                                          G_PARAM_READWRITE|
171                                                          G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
172
173 }
174
175 /**
176  * g_buffered_output_stream_get_buffer_size:
177  * @stream: a #GBufferedOutputStream.
178  * 
179  * Gets the size of the buffer in the @stream.
180  * 
181  * Returns: the current size of the buffer.
182  **/
183 gsize
184 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
185 {
186   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
187
188   return stream->priv->len;
189 }
190
191 /**
192  * g_buffered_output_stream_set_buffer_size:
193  * @stream: a #GBufferedOutputStream.
194  * @size: a #gsize.
195  *
196  * Sets the size of the internal buffer to @size.
197  **/    
198 void
199 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
200                                           gsize                  size)
201 {
202   GBufferedOutputStreamPrivate *priv;
203   guint8 *buffer;
204   
205   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
206
207   priv = stream->priv;
208   
209   if (size == priv->len)
210     return;
211
212   if (priv->buffer)
213     {
214       size = MAX (size, priv->pos);
215
216       buffer = g_malloc (size);
217       memcpy (buffer, priv->buffer, priv->pos);
218       g_free (priv->buffer);
219       priv->buffer = buffer;
220       priv->len = size;
221       /* Keep old pos */
222     }
223   else
224     {
225       priv->buffer = g_malloc (size);
226       priv->len = size;
227       priv->pos = 0;
228     }
229
230   g_object_notify (G_OBJECT (stream), "buffer-size");
231 }
232
233 /**
234  * g_buffered_output_stream_get_auto_grow:
235  * @stream: a #GBufferedOutputStream.
236  * 
237  * Checks if the buffer automatically grows as data is added.
238  * 
239  * Returns: %TRUE if the @stream's buffer automatically grows,
240  * %FALSE otherwise.
241  **/  
242 gboolean
243 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
244 {
245   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
246
247   return stream->priv->auto_grow;
248 }
249
250 /**
251  * g_buffered_output_stream_set_auto_grow:
252  * @stream: a #GBufferedOutputStream.
253  * @auto_grow: a #gboolean.
254  *
255  * Sets whether or not the @stream's buffer should automatically grow.
256  * If @auto_grow is true, then each write will just make the buffer
257  * larger, and you must manually flush the buffer to actually write out
258  * the data to the underlying stream.
259  **/
260 void
261 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
262                                         gboolean               auto_grow)
263 {
264   GBufferedOutputStreamPrivate *priv;
265   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
266   priv = stream->priv;
267   auto_grow = auto_grow != FALSE;
268   if (priv->auto_grow != auto_grow)
269     {
270       priv->auto_grow = auto_grow;
271       g_object_notify (G_OBJECT (stream), "auto-grow");
272     }
273 }
274
275 static void
276 g_buffered_output_stream_set_property (GObject      *object,
277                                        guint         prop_id,
278                                        const GValue *value,
279                                        GParamSpec   *pspec)
280 {
281   GBufferedOutputStream *stream;
282
283   stream = G_BUFFERED_OUTPUT_STREAM (object);
284
285   switch (prop_id) 
286     {
287     case PROP_BUFSIZE:
288       g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
289       break;    
290
291     case PROP_AUTO_GROW:
292       g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
293       break;
294
295     default:
296       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
297       break;
298     }
299
300 }
301
302 static void
303 g_buffered_output_stream_get_property (GObject    *object,
304                                        guint       prop_id,
305                                        GValue     *value,
306                                        GParamSpec *pspec)
307 {
308   GBufferedOutputStream *buffered_stream;
309   GBufferedOutputStreamPrivate *priv;
310
311   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
312   priv = buffered_stream->priv;
313
314   switch (prop_id)
315     {
316     case PROP_BUFSIZE:
317       g_value_set_uint (value, priv->len);
318       break;
319
320     case PROP_AUTO_GROW:
321       g_value_set_boolean (value, priv->auto_grow);
322       break;
323
324     default:
325       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
326       break;
327     }
328
329 }
330
331 static void
332 g_buffered_output_stream_finalize (GObject *object)
333 {
334   GBufferedOutputStream *stream;
335   GBufferedOutputStreamPrivate *priv;
336
337   stream = G_BUFFERED_OUTPUT_STREAM (object);
338   priv = stream->priv;
339
340   g_free (priv->buffer);
341
342   G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
343 }
344
345 static void
346 g_buffered_output_stream_init (GBufferedOutputStream *stream)
347 {
348   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
349                                               G_TYPE_BUFFERED_OUTPUT_STREAM,
350                                               GBufferedOutputStreamPrivate);
351
352 }
353
354 static void
355 g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface)
356 {
357   iface->tell         = g_buffered_output_stream_tell;
358   iface->can_seek     = g_buffered_output_stream_can_seek;
359   iface->seek         = g_buffered_output_stream_seek;
360   iface->can_truncate = g_buffered_output_stream_can_truncate;
361   iface->truncate_fn  = g_buffered_output_stream_truncate;
362 }
363
364 /**
365  * g_buffered_output_stream_new:
366  * @base_stream: a #GOutputStream.
367  * 
368  * Creates a new buffered output stream for a base stream.
369  * 
370  * Returns: a #GOutputStream for the given @base_stream.
371  **/  
372 GOutputStream *
373 g_buffered_output_stream_new (GOutputStream *base_stream)
374 {
375   GOutputStream *stream;
376
377   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
378
379   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
380                          "base-stream", base_stream,
381                          NULL);
382
383   return stream;
384 }
385
386 /**
387  * g_buffered_output_stream_new_sized:
388  * @base_stream: a #GOutputStream.
389  * @size: a #gsize.
390  * 
391  * Creates a new buffered output stream with a given buffer size.
392  * 
393  * Returns: a #GOutputStream with an internal buffer set to @size.
394  **/  
395 GOutputStream *
396 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
397                                     gsize          size)
398 {
399   GOutputStream *stream;
400
401   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
402
403   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
404                          "base-stream", base_stream,
405                          "buffer-size", size,
406                          NULL);
407
408   return stream;
409 }
410
411 static gboolean
412 flush_buffer (GBufferedOutputStream  *stream,
413               GCancellable           *cancellable,
414               GError                 **error)
415 {
416   GBufferedOutputStreamPrivate *priv;
417   GOutputStream                *base_stream;
418   gboolean                      res;
419   gsize                         bytes_written;
420   gsize                         count;
421
422   priv = stream->priv;
423   bytes_written = 0;
424   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
425
426   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
427
428   res = g_output_stream_write_all (base_stream,
429                                    priv->buffer,
430                                    priv->pos,
431                                    &bytes_written,
432                                    cancellable,
433                                    error);
434
435   count = priv->pos - bytes_written;
436
437   if (count > 0)
438     g_memmove (priv->buffer, priv->buffer + bytes_written, count);
439   
440   priv->pos -= bytes_written;
441
442   return res;
443 }
444
445 static gssize
446 g_buffered_output_stream_write  (GOutputStream *stream,
447                                  const void    *buffer,
448                                  gsize          count,
449                                  GCancellable  *cancellable,
450                                  GError       **error)
451 {
452   GBufferedOutputStream        *bstream;
453   GBufferedOutputStreamPrivate *priv;
454   gboolean res;
455   gsize    n;
456   gsize new_size;
457
458   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
459   priv = bstream->priv;
460
461   n = priv->len - priv->pos;
462
463   if (priv->auto_grow && n < count)
464     {
465       new_size = MAX (priv->len * 2, priv->len + count);
466       g_buffered_output_stream_set_buffer_size (bstream, new_size);
467     }
468   else if (n == 0)
469     {
470       res = flush_buffer (bstream, cancellable, error);
471       
472       if (res == FALSE)
473         return -1;
474     }
475
476   n = priv->len - priv->pos;
477   
478   count = MIN (count, n);
479   memcpy (priv->buffer + priv->pos, buffer, count);
480   priv->pos += count;
481
482   return count;
483 }
484
485 static gboolean
486 g_buffered_output_stream_flush (GOutputStream  *stream,
487                                 GCancellable   *cancellable,
488                                 GError        **error)
489 {
490   GBufferedOutputStream *bstream;
491   GOutputStream                *base_stream;
492   gboolean res;
493
494   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
495   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
496
497   res = flush_buffer (bstream, cancellable, error);
498
499   if (res == FALSE)
500     return FALSE;
501
502   res = g_output_stream_flush (base_stream, cancellable, error);
503
504   return res;
505 }
506
507 static gboolean
508 g_buffered_output_stream_close (GOutputStream  *stream,
509                                 GCancellable   *cancellable,
510                                 GError        **error)
511 {
512   GBufferedOutputStream        *bstream;
513   GOutputStream                *base_stream;
514   gboolean                      res;
515
516   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
517   base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
518   res = flush_buffer (bstream, cancellable, error);
519
520   if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
521     {
522       /* report the first error but still close the stream */
523       if (res)
524         res = g_output_stream_close (base_stream, cancellable, error);
525       else
526         g_output_stream_close (base_stream, cancellable, NULL);
527     }
528
529   return res;
530 }
531
532 static goffset
533 g_buffered_output_stream_tell (GSeekable *seekable)
534 {
535   GBufferedOutputStream        *bstream;
536   GBufferedOutputStreamPrivate *priv;
537   GOutputStream *base_stream;
538   GSeekable    *base_stream_seekable;
539   goffset base_offset;
540   
541   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
542   priv = bstream->priv;
543
544   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
545   if (!G_IS_SEEKABLE (base_stream))
546     return 0;
547
548   base_stream_seekable = G_SEEKABLE (base_stream);
549   
550   base_offset = g_seekable_tell (base_stream_seekable);
551   return base_offset + priv->pos;
552 }
553
554 static gboolean
555 g_buffered_output_stream_can_seek (GSeekable *seekable)
556 {
557   GOutputStream *base_stream;
558   
559   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
560   return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
561 }
562
563 static gboolean
564 g_buffered_output_stream_seek (GSeekable     *seekable,
565                                goffset        offset,
566                                GSeekType      type,
567                                GCancellable  *cancellable,
568                                GError       **error)
569 {
570   GBufferedOutputStream *bstream;
571   GOutputStream *base_stream;
572   GSeekable *base_stream_seekable;
573   gboolean flushed;
574
575   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
576
577   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
578   if (!G_IS_SEEKABLE (base_stream))
579     {
580       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
581                            _("Seek not supported on base stream"));
582       return FALSE;
583     }
584
585   base_stream_seekable = G_SEEKABLE (base_stream);
586   flushed = flush_buffer (bstream, cancellable, error);
587   if (!flushed)
588     return FALSE;
589
590   return g_seekable_seek (base_stream_seekable, offset, type, cancellable, error);
591 }
592
593 static gboolean
594 g_buffered_output_stream_can_truncate (GSeekable *seekable)
595 {
596   GOutputStream *base_stream;
597   
598   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
599   return G_IS_SEEKABLE (base_stream) && g_seekable_can_truncate (G_SEEKABLE (base_stream));
600 }
601
602 static gboolean
603 g_buffered_output_stream_truncate (GSeekable     *seekable,
604                                    goffset        offset,
605                                    GCancellable  *cancellable,
606                                    GError       **error)
607 {
608   GBufferedOutputStream        *bstream;
609   GOutputStream *base_stream;
610   GSeekable *base_stream_seekable;
611   gboolean flushed;
612
613   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
614   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
615   if (!G_IS_SEEKABLE (base_stream))
616     {
617       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
618                            _("Truncate not supported on base stream"));
619       return FALSE;
620     }
621
622   base_stream_seekable = G_SEEKABLE (base_stream);
623
624   flushed = flush_buffer (bstream, cancellable, error);
625   if (!flushed)
626     return FALSE;
627   return g_seekable_truncate (base_stream_seekable, offset, cancellable, error);
628 }
629
630 /* ************************** */
631 /* Async stuff implementation */
632 /* ************************** */
633
634 /* TODO: This should be using the base class async ops, not threads */
635
636 typedef struct {
637
638   guint flush_stream : 1;
639   guint close_stream : 1;
640
641 } FlushData;
642
643 static void
644 free_flush_data (gpointer data)
645 {
646   g_slice_free (FlushData, data);
647 }
648
649 /* This function is used by all three (i.e. 
650  * _write, _flush, _close) functions since
651  * all of them will need to flush the buffer
652  * and so closing and writing is just a special
653  * case of flushing + some addition stuff */
654 static void
655 flush_buffer_thread (GSimpleAsyncResult *result,
656                      GObject            *object,
657                      GCancellable       *cancellable)
658 {
659   GBufferedOutputStream *stream;
660   GOutputStream *base_stream;
661   FlushData     *fdata;
662   gboolean       res;
663   GError        *error = NULL;
664
665   stream = G_BUFFERED_OUTPUT_STREAM (object);
666   fdata = g_simple_async_result_get_op_res_gpointer (result);
667   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
668
669   res = flush_buffer (stream, cancellable, &error);
670
671   /* if flushing the buffer didn't work don't even bother
672    * to flush the stream but just report that error */
673   if (res && fdata->flush_stream)
674     res = g_output_stream_flush (base_stream, cancellable, &error);
675
676   if (fdata->close_stream) 
677     {
678      
679       /* if flushing the buffer or the stream returned 
680        * an error report that first error but still try 
681        * close the stream */
682       if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
683         {
684           if (res == FALSE)
685             g_output_stream_close (base_stream, cancellable, NULL);
686           else 
687             res = g_output_stream_close (base_stream, cancellable, &error);
688         }
689     }
690
691   if (res == FALSE)
692     g_simple_async_result_take_error (result, error);
693 }
694
695 static void
696 g_buffered_output_stream_flush_async (GOutputStream        *stream,
697                                       int                   io_priority,
698                                       GCancellable         *cancellable,
699                                       GAsyncReadyCallback   callback,
700                                       gpointer              data)
701 {
702   GSimpleAsyncResult *res;
703   FlushData          *fdata;
704
705   fdata = g_slice_new (FlushData);
706   fdata->flush_stream = TRUE;
707   fdata->close_stream = FALSE;
708
709   res = g_simple_async_result_new (G_OBJECT (stream),
710                                    callback,
711                                    data,
712                                    g_buffered_output_stream_flush_async);
713
714   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
715
716   g_simple_async_result_run_in_thread (res, 
717                                        flush_buffer_thread, 
718                                        io_priority,
719                                        cancellable);
720   g_object_unref (res);
721 }
722
723 static gboolean
724 g_buffered_output_stream_flush_finish (GOutputStream        *stream,
725                                        GAsyncResult         *result,
726                                        GError              **error)
727 {
728   GSimpleAsyncResult *simple;
729
730   simple = G_SIMPLE_ASYNC_RESULT (result);
731
732   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
733             g_buffered_output_stream_flush_async);
734
735   return TRUE;
736 }
737
738 static void
739 g_buffered_output_stream_close_async (GOutputStream        *stream,
740                                       int                   io_priority,
741                                       GCancellable         *cancellable,
742                                       GAsyncReadyCallback   callback,
743                                       gpointer              data)
744 {
745   GSimpleAsyncResult *res;
746   FlushData          *fdata;
747
748   fdata = g_slice_new (FlushData);
749   fdata->close_stream = TRUE;
750
751   res = g_simple_async_result_new (G_OBJECT (stream),
752                                    callback,
753                                    data,
754                                    g_buffered_output_stream_close_async);
755
756   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
757
758   g_simple_async_result_run_in_thread (res, 
759                                        flush_buffer_thread, 
760                                        io_priority,
761                                        cancellable);
762   g_object_unref (res);
763 }
764
765 static gboolean
766 g_buffered_output_stream_close_finish (GOutputStream        *stream,
767                                        GAsyncResult         *result,
768                                        GError              **error)
769 {
770   GSimpleAsyncResult *simple;
771
772   simple = G_SIMPLE_ASYNC_RESULT (result);
773
774   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
775             g_buffered_output_stream_close_async);
776
777   return TRUE;
778 }