Change LGPL-2.1+ to LGPL-2.1-or-later
[platform/upstream/glib.git] / gio / gbufferedoutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * SPDX-License-Identifier: LGPL-2.1-or-later
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General
18  * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
19  *
20  * Author: Christian Kellner <gicmo@gnome.org> 
21  */
22
23 #include "config.h"
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gseekable.h"
27 #include "gtask.h"
28 #include "string.h"
29 #include "gioerror.h"
30 #include "glibintl.h"
31
32 /**
33  * SECTION:gbufferedoutputstream
34  * @short_description: Buffered Output Stream
35  * @include: gio/gio.h
36  * @see_also: #GFilterOutputStream, #GOutputStream
37  * 
38  * Buffered output stream implements #GFilterOutputStream and provides 
39  * for buffered writes. 
40  * 
41  * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
42  * 
43  * To create a buffered output stream, use g_buffered_output_stream_new(), 
44  * or g_buffered_output_stream_new_sized() to specify the buffer's size 
45  * at construction.
46  * 
47  * To get the size of a buffer within a buffered input stream, use 
48  * g_buffered_output_stream_get_buffer_size(). To change the size of a 
49  * buffered output stream's buffer, use 
50  * g_buffered_output_stream_set_buffer_size(). Note that the buffer's 
51  * size cannot be reduced below the size of the data within the buffer.
52  **/
53
54 #define DEFAULT_BUFFER_SIZE 4096
55
56 struct _GBufferedOutputStreamPrivate {
57   guint8 *buffer; 
58   gsize   len;
59   goffset pos;
60   gboolean auto_grow;
61 };
62
63 enum {
64   PROP_0,
65   PROP_BUFSIZE,
66   PROP_AUTO_GROW
67 };
68
69 static void     g_buffered_output_stream_set_property (GObject      *object,
70                                                        guint         prop_id,
71                                                        const GValue *value,
72                                                        GParamSpec   *pspec);
73
74 static void     g_buffered_output_stream_get_property (GObject    *object,
75                                                        guint       prop_id,
76                                                        GValue     *value,
77                                                        GParamSpec *pspec);
78 static void     g_buffered_output_stream_finalize     (GObject *object);
79
80
81 static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
82                                                        const void    *buffer,
83                                                        gsize          count,
84                                                        GCancellable  *cancellable,
85                                                        GError       **error);
86 static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
87                                                        GCancellable  *cancellable,
88                                                        GError          **error);
89 static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
90                                                        GCancellable   *cancellable,
91                                                        GError        **error);
92
93 static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
94                                                        int                   io_priority,
95                                                        GCancellable         *cancellable,
96                                                        GAsyncReadyCallback   callback,
97                                                        gpointer              data);
98 static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
99                                                        GAsyncResult         *result,
100                                                        GError              **error);
101 static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
102                                                        int                   io_priority,
103                                                        GCancellable         *cancellable,
104                                                        GAsyncReadyCallback   callback,
105                                                        gpointer              data);
106 static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
107                                                        GAsyncResult         *result,
108                                                        GError              **error);
109
110 static void     g_buffered_output_stream_seekable_iface_init (GSeekableIface  *iface);
111 static goffset  g_buffered_output_stream_tell                (GSeekable       *seekable);
112 static gboolean g_buffered_output_stream_can_seek            (GSeekable       *seekable);
113 static gboolean g_buffered_output_stream_seek                (GSeekable       *seekable,
114                                                               goffset          offset,
115                                                               GSeekType        type,
116                                                               GCancellable    *cancellable,
117                                                               GError         **error);
118 static gboolean g_buffered_output_stream_can_truncate        (GSeekable       *seekable);
119 static gboolean g_buffered_output_stream_truncate            (GSeekable       *seekable,
120                                                               goffset          offset,
121                                                               GCancellable    *cancellable,
122                                                               GError         **error);
123
124 G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream,
125                          g_buffered_output_stream,
126                          G_TYPE_FILTER_OUTPUT_STREAM,
127                          G_ADD_PRIVATE (GBufferedOutputStream)
128                          G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
129                                                 g_buffered_output_stream_seekable_iface_init))
130
131
132 static void
133 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
134 {
135   GObjectClass *object_class;
136   GOutputStreamClass *ostream_class;
137
138   object_class = G_OBJECT_CLASS (klass);
139   object_class->get_property = g_buffered_output_stream_get_property;
140   object_class->set_property = g_buffered_output_stream_set_property;
141   object_class->finalize     = g_buffered_output_stream_finalize;
142
143   ostream_class = G_OUTPUT_STREAM_CLASS (klass);
144   ostream_class->write_fn = g_buffered_output_stream_write;
145   ostream_class->flush = g_buffered_output_stream_flush;
146   ostream_class->close_fn = g_buffered_output_stream_close;
147   ostream_class->flush_async  = g_buffered_output_stream_flush_async;
148   ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
149   ostream_class->close_async  = g_buffered_output_stream_close_async;
150   ostream_class->close_finish = g_buffered_output_stream_close_finish;
151
152   g_object_class_install_property (object_class,
153                                    PROP_BUFSIZE,
154                                    g_param_spec_uint ("buffer-size",
155                                                       P_("Buffer Size"),
156                                                       P_("The size of the backend buffer"),
157                                                       1,
158                                                       G_MAXUINT,
159                                                       DEFAULT_BUFFER_SIZE,
160                                                       G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
161                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
162
163   g_object_class_install_property (object_class,
164                                    PROP_AUTO_GROW,
165                                    g_param_spec_boolean ("auto-grow",
166                                                          P_("Auto-grow"),
167                                                          P_("Whether the buffer should automatically grow"),
168                                                          FALSE,
169                                                          G_PARAM_READWRITE|
170                                                          G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
171
172 }
173
174 /**
175  * g_buffered_output_stream_get_buffer_size:
176  * @stream: a #GBufferedOutputStream.
177  * 
178  * Gets the size of the buffer in the @stream.
179  * 
180  * Returns: the current size of the buffer.
181  **/
182 gsize
183 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
184 {
185   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
186
187   return stream->priv->len;
188 }
189
190 /**
191  * g_buffered_output_stream_set_buffer_size:
192  * @stream: a #GBufferedOutputStream.
193  * @size: a #gsize.
194  *
195  * Sets the size of the internal buffer to @size.
196  **/    
197 void
198 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
199                                           gsize                  size)
200 {
201   GBufferedOutputStreamPrivate *priv;
202   guint8 *buffer;
203   
204   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
205
206   priv = stream->priv;
207   
208   if (size == priv->len)
209     return;
210
211   if (priv->buffer)
212     {
213       size = (priv->pos > 0) ? MAX (size, (gsize) priv->pos) : size;
214
215       buffer = g_malloc (size);
216       memcpy (buffer, priv->buffer, priv->pos);
217       g_free (priv->buffer);
218       priv->buffer = buffer;
219       priv->len = size;
220       /* Keep old pos */
221     }
222   else
223     {
224       priv->buffer = g_malloc (size);
225       priv->len = size;
226       priv->pos = 0;
227     }
228
229   g_object_notify (G_OBJECT (stream), "buffer-size");
230 }
231
232 /**
233  * g_buffered_output_stream_get_auto_grow:
234  * @stream: a #GBufferedOutputStream.
235  * 
236  * Checks if the buffer automatically grows as data is added.
237  * 
238  * Returns: %TRUE if the @stream's buffer automatically grows,
239  * %FALSE otherwise.
240  **/  
241 gboolean
242 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
243 {
244   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
245
246   return stream->priv->auto_grow;
247 }
248
249 /**
250  * g_buffered_output_stream_set_auto_grow:
251  * @stream: a #GBufferedOutputStream.
252  * @auto_grow: a #gboolean.
253  *
254  * Sets whether or not the @stream's buffer should automatically grow.
255  * If @auto_grow is true, then each write will just make the buffer
256  * larger, and you must manually flush the buffer to actually write out
257  * the data to the underlying stream.
258  **/
259 void
260 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
261                                         gboolean               auto_grow)
262 {
263   GBufferedOutputStreamPrivate *priv;
264   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
265   priv = stream->priv;
266   auto_grow = auto_grow != FALSE;
267   if (priv->auto_grow != auto_grow)
268     {
269       priv->auto_grow = auto_grow;
270       g_object_notify (G_OBJECT (stream), "auto-grow");
271     }
272 }
273
274 static void
275 g_buffered_output_stream_set_property (GObject      *object,
276                                        guint         prop_id,
277                                        const GValue *value,
278                                        GParamSpec   *pspec)
279 {
280   GBufferedOutputStream *stream;
281
282   stream = G_BUFFERED_OUTPUT_STREAM (object);
283
284   switch (prop_id) 
285     {
286     case PROP_BUFSIZE:
287       g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
288       break;    
289
290     case PROP_AUTO_GROW:
291       g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
292       break;
293
294     default:
295       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
296       break;
297     }
298
299 }
300
301 static void
302 g_buffered_output_stream_get_property (GObject    *object,
303                                        guint       prop_id,
304                                        GValue     *value,
305                                        GParamSpec *pspec)
306 {
307   GBufferedOutputStream *buffered_stream;
308   GBufferedOutputStreamPrivate *priv;
309
310   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
311   priv = buffered_stream->priv;
312
313   switch (prop_id)
314     {
315     case PROP_BUFSIZE:
316       g_value_set_uint (value, priv->len);
317       break;
318
319     case PROP_AUTO_GROW:
320       g_value_set_boolean (value, priv->auto_grow);
321       break;
322
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
325       break;
326     }
327
328 }
329
330 static void
331 g_buffered_output_stream_finalize (GObject *object)
332 {
333   GBufferedOutputStream *stream;
334   GBufferedOutputStreamPrivate *priv;
335
336   stream = G_BUFFERED_OUTPUT_STREAM (object);
337   priv = stream->priv;
338
339   g_free (priv->buffer);
340
341   G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
342 }
343
344 static void
345 g_buffered_output_stream_init (GBufferedOutputStream *stream)
346 {
347   stream->priv = g_buffered_output_stream_get_instance_private (stream);
348 }
349
350 static void
351 g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface)
352 {
353   iface->tell         = g_buffered_output_stream_tell;
354   iface->can_seek     = g_buffered_output_stream_can_seek;
355   iface->seek         = g_buffered_output_stream_seek;
356   iface->can_truncate = g_buffered_output_stream_can_truncate;
357   iface->truncate_fn  = g_buffered_output_stream_truncate;
358 }
359
360 /**
361  * g_buffered_output_stream_new:
362  * @base_stream: a #GOutputStream.
363  * 
364  * Creates a new buffered output stream for a base stream.
365  * 
366  * Returns: a #GOutputStream for the given @base_stream.
367  **/  
368 GOutputStream *
369 g_buffered_output_stream_new (GOutputStream *base_stream)
370 {
371   GOutputStream *stream;
372
373   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
374
375   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
376                          "base-stream", base_stream,
377                          NULL);
378
379   return stream;
380 }
381
382 /**
383  * g_buffered_output_stream_new_sized:
384  * @base_stream: a #GOutputStream.
385  * @size: a #gsize.
386  * 
387  * Creates a new buffered output stream with a given buffer size.
388  * 
389  * Returns: a #GOutputStream with an internal buffer set to @size.
390  **/  
391 GOutputStream *
392 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
393                                     gsize          size)
394 {
395   GOutputStream *stream;
396
397   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
398
399   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
400                          "base-stream", base_stream,
401                          "buffer-size", size,
402                          NULL);
403
404   return stream;
405 }
406
407 static gboolean
408 flush_buffer (GBufferedOutputStream  *stream,
409               GCancellable           *cancellable,
410               GError                 **error)
411 {
412   GBufferedOutputStreamPrivate *priv;
413   GOutputStream                *base_stream;
414   gboolean                      res;
415   gsize                         bytes_written;
416   gsize                         count;
417
418   priv = stream->priv;
419   bytes_written = 0;
420   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
421
422   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
423
424   res = g_output_stream_write_all (base_stream,
425                                    priv->buffer,
426                                    priv->pos,
427                                    &bytes_written,
428                                    cancellable,
429                                    error);
430
431   count = priv->pos - bytes_written;
432
433   if (count > 0)
434     memmove (priv->buffer, priv->buffer + bytes_written, count);
435   
436   priv->pos -= bytes_written;
437
438   return res;
439 }
440
441 static gssize
442 g_buffered_output_stream_write  (GOutputStream *stream,
443                                  const void    *buffer,
444                                  gsize          count,
445                                  GCancellable  *cancellable,
446                                  GError       **error)
447 {
448   GBufferedOutputStream        *bstream;
449   GBufferedOutputStreamPrivate *priv;
450   gboolean res;
451   gsize    n;
452   gsize new_size;
453
454   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
455   priv = bstream->priv;
456
457   n = priv->len - priv->pos;
458
459   if (priv->auto_grow && n < count)
460     {
461       new_size = MAX (priv->len * 2, priv->len + count);
462       g_buffered_output_stream_set_buffer_size (bstream, new_size);
463     }
464   else if (n == 0)
465     {
466       res = flush_buffer (bstream, cancellable, error);
467       
468       if (res == FALSE)
469         return -1;
470     }
471
472   n = priv->len - priv->pos;
473   
474   count = MIN (count, n);
475   memcpy (priv->buffer + priv->pos, buffer, count);
476   priv->pos += count;
477
478   return count;
479 }
480
481 static gboolean
482 g_buffered_output_stream_flush (GOutputStream  *stream,
483                                 GCancellable   *cancellable,
484                                 GError        **error)
485 {
486   GBufferedOutputStream *bstream;
487   GOutputStream                *base_stream;
488   gboolean res;
489
490   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
491   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
492
493   res = flush_buffer (bstream, cancellable, error);
494
495   if (res == FALSE)
496     return FALSE;
497
498   res = g_output_stream_flush (base_stream, cancellable, error);
499
500   return res;
501 }
502
503 static gboolean
504 g_buffered_output_stream_close (GOutputStream  *stream,
505                                 GCancellable   *cancellable,
506                                 GError        **error)
507 {
508   GBufferedOutputStream        *bstream;
509   GOutputStream                *base_stream;
510   gboolean                      res;
511
512   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
513   base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
514   res = flush_buffer (bstream, cancellable, error);
515
516   if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
517     {
518       /* report the first error but still close the stream */
519       if (res)
520         res = g_output_stream_close (base_stream, cancellable, error);
521       else
522         g_output_stream_close (base_stream, cancellable, NULL);
523     }
524
525   return res;
526 }
527
528 static goffset
529 g_buffered_output_stream_tell (GSeekable *seekable)
530 {
531   GBufferedOutputStream        *bstream;
532   GBufferedOutputStreamPrivate *priv;
533   GOutputStream *base_stream;
534   GSeekable    *base_stream_seekable;
535   goffset base_offset;
536   
537   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
538   priv = bstream->priv;
539
540   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
541   if (!G_IS_SEEKABLE (base_stream))
542     return 0;
543
544   base_stream_seekable = G_SEEKABLE (base_stream);
545   
546   base_offset = g_seekable_tell (base_stream_seekable);
547   return base_offset + priv->pos;
548 }
549
550 static gboolean
551 g_buffered_output_stream_can_seek (GSeekable *seekable)
552 {
553   GOutputStream *base_stream;
554   
555   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
556   return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
557 }
558
559 static gboolean
560 g_buffered_output_stream_seek (GSeekable     *seekable,
561                                goffset        offset,
562                                GSeekType      type,
563                                GCancellable  *cancellable,
564                                GError       **error)
565 {
566   GBufferedOutputStream *bstream;
567   GOutputStream *base_stream;
568   GSeekable *base_stream_seekable;
569   gboolean flushed;
570
571   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
572
573   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
574   if (!G_IS_SEEKABLE (base_stream))
575     {
576       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
577                            _("Seek not supported on base stream"));
578       return FALSE;
579     }
580
581   base_stream_seekable = G_SEEKABLE (base_stream);
582   flushed = flush_buffer (bstream, cancellable, error);
583   if (!flushed)
584     return FALSE;
585
586   return g_seekable_seek (base_stream_seekable, offset, type, cancellable, error);
587 }
588
589 static gboolean
590 g_buffered_output_stream_can_truncate (GSeekable *seekable)
591 {
592   GOutputStream *base_stream;
593   
594   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
595   return G_IS_SEEKABLE (base_stream) && g_seekable_can_truncate (G_SEEKABLE (base_stream));
596 }
597
598 static gboolean
599 g_buffered_output_stream_truncate (GSeekable     *seekable,
600                                    goffset        offset,
601                                    GCancellable  *cancellable,
602                                    GError       **error)
603 {
604   GBufferedOutputStream        *bstream;
605   GOutputStream *base_stream;
606   GSeekable *base_stream_seekable;
607   gboolean flushed;
608
609   bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
610   base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
611   if (!G_IS_SEEKABLE (base_stream))
612     {
613       g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
614                            _("Truncate not supported on base stream"));
615       return FALSE;
616     }
617
618   base_stream_seekable = G_SEEKABLE (base_stream);
619
620   flushed = flush_buffer (bstream, cancellable, error);
621   if (!flushed)
622     return FALSE;
623   return g_seekable_truncate (base_stream_seekable, offset, cancellable, error);
624 }
625
626 /* ************************** */
627 /* Async stuff implementation */
628 /* ************************** */
629
630 /* TODO: This should be using the base class async ops, not threads */
631
632 typedef struct {
633
634   guint flush_stream : 1;
635   guint close_stream : 1;
636
637 } FlushData;
638
639 static void
640 free_flush_data (gpointer data)
641 {
642   g_slice_free (FlushData, data);
643 }
644
645 /* This function is used by all three (i.e. 
646  * _write, _flush, _close) functions since
647  * all of them will need to flush the buffer
648  * and so closing and writing is just a special
649  * case of flushing + some addition stuff */
650 static void
651 flush_buffer_thread (GTask        *task,
652                      gpointer      object,
653                      gpointer      task_data,
654                      GCancellable *cancellable)
655 {
656   GBufferedOutputStream *stream;
657   GOutputStream *base_stream;
658   FlushData     *fdata;
659   gboolean       res;
660   GError        *error = NULL;
661
662   stream = G_BUFFERED_OUTPUT_STREAM (object);
663   fdata = task_data;
664   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
665
666   res = flush_buffer (stream, cancellable, &error);
667
668   /* if flushing the buffer didn't work don't even bother
669    * to flush the stream but just report that error */
670   if (res && fdata->flush_stream)
671     res = g_output_stream_flush (base_stream, cancellable, &error);
672
673   if (fdata->close_stream) 
674     {
675      
676       /* if flushing the buffer or the stream returned 
677        * an error report that first error but still try 
678        * close the stream */
679       if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
680         {
681           if (res == FALSE)
682             g_output_stream_close (base_stream, cancellable, NULL);
683           else 
684             res = g_output_stream_close (base_stream, cancellable, &error);
685         }
686     }
687
688   if (res == FALSE)
689     g_task_return_error (task, error);
690   else
691     g_task_return_boolean (task, TRUE);
692 }
693
694 static void
695 g_buffered_output_stream_flush_async (GOutputStream        *stream,
696                                       int                   io_priority,
697                                       GCancellable         *cancellable,
698                                       GAsyncReadyCallback   callback,
699                                       gpointer              data)
700 {
701   GTask *task;
702   FlushData *fdata;
703
704   fdata = g_slice_new0 (FlushData);
705   fdata->flush_stream = TRUE;
706   fdata->close_stream = FALSE;
707
708   task = g_task_new (stream, cancellable, callback, data);
709   g_task_set_source_tag (task, g_buffered_output_stream_flush_async);
710   g_task_set_task_data (task, fdata, free_flush_data);
711   g_task_set_priority (task, io_priority);
712
713   g_task_run_in_thread (task, flush_buffer_thread);
714   g_object_unref (task);
715 }
716
717 static gboolean
718 g_buffered_output_stream_flush_finish (GOutputStream        *stream,
719                                        GAsyncResult         *result,
720                                        GError              **error)
721 {
722   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
723
724   return g_task_propagate_boolean (G_TASK (result), error);
725 }
726
727 static void
728 g_buffered_output_stream_close_async (GOutputStream        *stream,
729                                       int                   io_priority,
730                                       GCancellable         *cancellable,
731                                       GAsyncReadyCallback   callback,
732                                       gpointer              data)
733 {
734   GTask *task;
735   FlushData *fdata;
736
737   fdata = g_slice_new0 (FlushData);
738   fdata->close_stream = TRUE;
739
740   task = g_task_new (stream, cancellable, callback, data);
741   g_task_set_source_tag (task, g_buffered_output_stream_close_async);
742   g_task_set_task_data (task, fdata, free_flush_data);
743   g_task_set_priority (task, io_priority);
744
745   g_task_run_in_thread (task, flush_buffer_thread);
746   g_object_unref (task);
747 }
748
749 static gboolean
750 g_buffered_output_stream_close_finish (GOutputStream        *stream,
751                                        GAsyncResult         *result,
752                                        GError              **error)
753 {
754   g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
755
756   return g_task_propagate_boolean (G_TASK (result), error);
757 }