gio/ docs/reference/gio Merged gio-standalone into glib.
[platform/upstream/glib.git] / gio / gbufferedoutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Christian Kellner <gicmo@gnome.org> 
21  */
22
23 #include <config.h>
24
25 #include "gbufferedoutputstream.h"
26 #include "goutputstream.h"
27 #include "gsimpleasyncresult.h"
28 #include "string.h"
29
30 #include "glibintl.h"
31
32 #define DEFAULT_BUFFER_SIZE 4096
33
34 struct _GBufferedOutputStreamPrivate {
35   guint8 *buffer; 
36   gsize   len;
37   goffset pos;
38   gboolean auto_grow;
39 };
40
41 enum {
42   PROP_0,
43   PROP_BUFSIZE
44 };
45
46 static void     g_buffered_output_stream_set_property (GObject      *object,
47                                                        guint         prop_id,
48                                                        const GValue *value,
49                                                        GParamSpec   *pspec);
50
51 static void     g_buffered_output_stream_get_property (GObject    *object,
52                                                        guint       prop_id,
53                                                        GValue     *value,
54                                                        GParamSpec *pspec);
55 static void     g_buffered_output_stream_finalize     (GObject *object);
56
57
58 static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
59                                                        const void    *buffer,
60                                                        gsize          count,
61                                                        GCancellable  *cancellable,
62                                                        GError       **error);
63 static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
64                                                        GCancellable  *cancellable,
65                                                        GError          **error);
66 static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
67                                                        GCancellable   *cancellable,
68                                                        GError        **error);
69
70 static void     g_buffered_output_stream_write_async  (GOutputStream        *stream,
71                                                        const void           *buffer,
72                                                        gsize                 count,
73                                                        int                   io_priority,
74                                                        GCancellable         *cancellable,
75                                                        GAsyncReadyCallback   callback,
76                                                        gpointer              data);
77 static gssize   g_buffered_output_stream_write_finish (GOutputStream        *stream,
78                                                        GAsyncResult         *result,
79                                                        GError              **error);
80 static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
81                                                        int                   io_priority,
82                                                        GCancellable         *cancellable,
83                                                        GAsyncReadyCallback   callback,
84                                                        gpointer              data);
85 static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
86                                                        GAsyncResult         *result,
87                                                        GError              **error);
88 static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
89                                                        int                   io_priority,
90                                                        GCancellable         *cancellable,
91                                                        GAsyncReadyCallback   callback,
92                                                        gpointer              data);
93 static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
94                                                        GAsyncResult         *result,
95                                                        GError              **error);
96
97 G_DEFINE_TYPE (GBufferedOutputStream,
98                g_buffered_output_stream,
99                G_TYPE_FILTER_OUTPUT_STREAM)
100
101
102 static void
103 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
104 {
105   GObjectClass *object_class;
106   GOutputStreamClass *ostream_class;
107  
108   g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
109
110   object_class = G_OBJECT_CLASS (klass);
111   object_class->get_property = g_buffered_output_stream_get_property;
112   object_class->set_property = g_buffered_output_stream_set_property;
113   object_class->finalize     = g_buffered_output_stream_finalize;
114
115   ostream_class = G_OUTPUT_STREAM_CLASS (klass);
116   ostream_class->write = g_buffered_output_stream_write;
117   ostream_class->flush = g_buffered_output_stream_flush;
118   ostream_class->close = g_buffered_output_stream_close;
119   ostream_class->write_async  = g_buffered_output_stream_write_async;
120   ostream_class->write_finish = g_buffered_output_stream_write_finish;
121   ostream_class->flush_async  = g_buffered_output_stream_flush_async;
122   ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
123   ostream_class->close_async  = g_buffered_output_stream_close_async;
124   ostream_class->close_finish = g_buffered_output_stream_close_finish;
125
126   g_object_class_install_property (object_class,
127                                    PROP_BUFSIZE,
128                                    g_param_spec_uint ("buffer-size",
129                                                       P_("Buffer Size"),
130                                                       P_("The size of the backend buffer"),
131                                                       1,
132                                                       G_MAXUINT,
133                                                       DEFAULT_BUFFER_SIZE,
134                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
135                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
136
137 }
138
139 /**
140  * g_buffered_output_stream_get_buffer_size:
141  * @stream: a #GBufferedOutputStream.
142  * 
143  * Returns: the current size of the buffer.
144  **/
145 gsize
146 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
147 {
148   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
149
150   return stream->priv->len;
151 }
152
153 /**
154  * g_buffered_output_stream_set_buffer_size:
155  * @stream: a #GBufferedOutputStream.
156  * @size: a #gsize.
157  *
158  * Sets the size of the internal buffer to @size.
159  * 
160  **/    
161 void
162 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
163                                          gsize                 size)
164 {
165   GBufferedOutputStreamPrivate *priv;
166   guint8 *buffer;
167   
168   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
169
170   priv = stream->priv;
171   
172   if (priv->buffer)
173     {
174       size = MAX (size, priv->pos);
175
176       buffer = g_malloc (size);
177       memcpy (buffer, priv->buffer, priv->pos);
178       g_free (priv->buffer);
179       priv->buffer = buffer;
180       priv->len = size;
181       /* Keep old pos */
182     }
183   else
184     {
185       priv->buffer = g_malloc (size);
186       priv->len = size;
187       priv->pos = 0;
188     }
189 }
190
191 /**
192  * g_buffered_output_stream_get_auto_grow:
193  * @stream: a #GBufferedOutputStream.
194  * 
195  * Returns: %TRUE if the @stream's buffer automatically grows,
196  * %FALSE otherwise.
197  **/  
198 gboolean
199 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
200 {
201   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
202
203   return stream->priv->auto_grow;
204 }
205
206 /**
207  * g_buffered_output_stream_set_auto_grow:
208  * @stream: a #GBufferedOutputStream.
209  * @auto_grow: a boolean.
210  *
211  * Sets whether or not the @stream's buffer should automatically grow.
212  * 
213  **/
214 void
215 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
216                                        gboolean              auto_grow)
217 {
218   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
219
220   stream->priv->auto_grow = auto_grow;
221 }
222
223 static void
224 g_buffered_output_stream_set_property (GObject         *object,
225                                        guint            prop_id,
226                                        const GValue    *value,
227                                        GParamSpec      *pspec)
228 {
229   GBufferedOutputStream *buffered_stream;
230   GBufferedOutputStreamPrivate *priv;
231
232   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
233   priv = buffered_stream->priv;
234
235   switch (prop_id) 
236     {
237
238     case PROP_BUFSIZE:
239       g_buffered_output_stream_set_buffer_size (buffered_stream, g_value_get_uint (value));
240       break;    
241
242     default:
243       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
244       break;
245     }
246
247 }
248
249 static void
250 g_buffered_output_stream_get_property (GObject    *object,
251                                        guint       prop_id,
252                                        GValue     *value,
253                                        GParamSpec *pspec)
254 {
255   GBufferedOutputStream *buffered_stream;
256   GBufferedOutputStreamPrivate *priv;
257
258   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
259   priv = buffered_stream->priv;
260
261   switch (prop_id)
262     {
263
264     case PROP_BUFSIZE:
265       g_value_set_uint (value, priv->len);
266       break;
267
268     default:
269       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
270       break;
271     }
272
273 }
274
275 static void
276 g_buffered_output_stream_finalize (GObject *object)
277 {
278   GBufferedOutputStream *stream;
279   GBufferedOutputStreamPrivate *priv;
280
281   stream = G_BUFFERED_OUTPUT_STREAM (object);
282   priv = stream->priv;
283
284   g_free (priv->buffer);
285
286   if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize)
287     (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object);
288 }
289
290 static void
291 g_buffered_output_stream_init (GBufferedOutputStream *stream)
292 {
293   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
294                                               G_TYPE_BUFFERED_OUTPUT_STREAM,
295                                               GBufferedOutputStreamPrivate);
296
297 }
298
299 /**
300  * g_buffered_output_stream_new:
301  * @base_stream: a #GOutputStream.
302  * 
303  * Returns: a #GOutputStream for the given @base_stream.
304  **/  
305 GOutputStream *
306 g_buffered_output_stream_new (GOutputStream *base_stream)
307 {
308   GOutputStream *stream;
309
310   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
311
312   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
313                          "base-stream", base_stream,
314                          NULL);
315
316   return stream;
317 }
318
319 /**
320  * g_buffered_output_stream_new_sized:
321  * @base_stream: a #GOutputStream.
322  * @size: a #gsize.
323  * 
324  * Returns: a #GOutputStream with an internal buffer set to @size.
325  **/  
326 GOutputStream *
327 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
328                                     guint          size)
329 {
330   GOutputStream *stream;
331
332   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
333
334   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
335                          "base-stream", base_stream,
336                          "buffer-size", size,
337                          NULL);
338
339   return stream;
340 }
341
342 static gboolean
343 flush_buffer (GBufferedOutputStream  *stream,
344               GCancellable           *cancellable,
345               GError                 **error)
346 {
347   GBufferedOutputStreamPrivate *priv;
348   GOutputStream                *base_stream;
349   gboolean                      res;
350   gsize                         bytes_written;
351   gsize                         count;
352
353   priv = stream->priv;
354   bytes_written = 0;
355   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
356
357   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
358
359   res = g_output_stream_write_all (base_stream,
360                                    priv->buffer,
361                                    priv->pos,
362                                    &bytes_written,
363                                    cancellable,
364                                    error);
365
366   count = priv->pos - bytes_written;
367
368   if (count > 0)
369     g_memmove (priv->buffer, priv->buffer + bytes_written, count);
370   
371   priv->pos -= bytes_written;
372
373   return res;
374 }
375
376 static gssize
377 g_buffered_output_stream_write  (GOutputStream *stream,
378                                  const void    *buffer,
379                                  gsize          count,
380                                  GCancellable  *cancellable,
381                                  GError       **error)
382 {
383   GBufferedOutputStream        *bstream;
384   GBufferedOutputStreamPrivate *priv;
385   gboolean res;
386   gsize    n;
387   gsize new_size;
388
389   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
390   priv = bstream->priv;
391
392   n = priv->len - priv->pos;
393
394   if (priv->auto_grow && n < count)
395     {
396       new_size = MAX (priv->len * 2, priv->len + count);
397       g_buffered_output_stream_set_buffer_size (bstream, new_size);
398     }
399   else if (n == 0)
400     {
401       res = flush_buffer (bstream, cancellable, error);
402       
403       if (res == FALSE)
404         return -1;
405     }
406
407   n = priv->len - priv->pos;
408   
409   count = MIN (count, n);
410   memcpy (priv->buffer + priv->pos, buffer, count);
411   priv->pos += count;
412
413   return count;
414 }
415
416 static gboolean
417 g_buffered_output_stream_flush (GOutputStream  *stream,
418                                 GCancellable   *cancellable,
419                                 GError        **error)
420 {
421   GBufferedOutputStream *bstream;
422   GBufferedOutputStreamPrivate *priv;
423   GOutputStream                *base_stream;
424   gboolean res;
425
426   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
427   priv = bstream->priv;
428   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
429
430   res = flush_buffer (bstream, cancellable, error);
431
432   if (res == FALSE) {
433     return FALSE;
434   }
435
436   res = g_output_stream_flush (base_stream,
437                                cancellable,
438                                error);
439   return res;
440 }
441
442 static gboolean
443 g_buffered_output_stream_close (GOutputStream  *stream,
444                                 GCancellable   *cancellable,
445                                 GError        **error)
446 {
447   GBufferedOutputStream        *bstream;
448   GBufferedOutputStreamPrivate *priv;
449   GOutputStream                *base_stream;
450   gboolean                      res;
451
452   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
453   priv = bstream->priv;
454   base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
455
456   res = flush_buffer (bstream, cancellable, error);
457
458   /* report the first error but still close the stream */
459   if (res)
460     {
461       res = g_output_stream_close (base_stream,
462                                    cancellable,
463                                    error); 
464     }
465   else
466     {
467       g_output_stream_close (base_stream,
468                              cancellable,
469                              NULL); 
470     }
471
472   return res;
473 }
474
475 /* ************************** */
476 /* Async stuff implementation */
477 /* ************************** */
478
479 /* TODO: This should be using the base class async ops, not threads */
480
481 typedef struct {
482
483   guint flush_stream : 1;
484   guint close_stream : 1;
485
486 } FlushData;
487
488 static void
489 free_flush_data (gpointer data)
490 {
491   g_slice_free (FlushData, data);
492 }
493
494 /* This function is used by all three (i.e. 
495  * _write, _flush, _close) functions since
496  * all of them will need to flush the buffer
497  * and so closing and writing is just a special
498  * case of flushing + some addition stuff */
499 static void
500 flush_buffer_thread (GSimpleAsyncResult *result,
501                      GObject            *object,
502                      GCancellable       *cancellable)
503 {
504   GBufferedOutputStream *stream;
505   GOutputStream *base_stream;
506   FlushData     *fdata;
507   gboolean       res;
508   GError        *error = NULL;
509
510   stream = G_BUFFERED_OUTPUT_STREAM (object);
511   fdata = g_simple_async_result_get_op_res_gpointer (result);
512   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
513
514   res = flush_buffer (stream, cancellable, &error);
515
516   /* if flushing the buffer didn't work don't even bother
517    * to flush the stream but just report that error */
518   if (res && fdata->flush_stream)
519     {
520       res = g_output_stream_flush (base_stream,
521                                    cancellable,
522                                    &error);
523     }
524
525   if (fdata->close_stream) 
526     {
527      
528       /* if flushing the buffer or the stream returned 
529        * an error report that first error but still try 
530        * close the stream */
531       if (res == FALSE)
532         {
533           g_output_stream_close (base_stream,
534                                  cancellable,
535                                  NULL);
536         } 
537       else 
538         {
539           res = g_output_stream_close (base_stream,
540                                        cancellable,
541                                        &error);
542         } 
543
544     }
545
546   if (res == FALSE)
547     {
548       g_simple_async_result_set_from_error (result, error);
549       g_error_free (error);
550     }
551 }
552
553 typedef struct {
554     
555   FlushData fdata;
556
557   gsize  count;
558   const void  *buffer;
559
560 } WriteData;
561
562 static void 
563 free_write_data (gpointer data)
564 {
565   g_slice_free (WriteData, data);
566 }
567
568 static void
569 g_buffered_output_stream_write_async (GOutputStream        *stream,
570                                       const void           *buffer,
571                                       gsize                 count,
572                                       int                   io_priority,
573                                       GCancellable         *cancellable,
574                                       GAsyncReadyCallback   callback,
575                                       gpointer              data)
576 {
577   GBufferedOutputStream *buffered_stream;
578   GBufferedOutputStreamPrivate *priv;
579   GSimpleAsyncResult *res;
580   WriteData *wdata;
581
582   buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
583   priv = buffered_stream->priv;
584
585   wdata = g_slice_new (WriteData);
586   wdata->count  = count;
587   wdata->buffer = buffer;
588
589   res = g_simple_async_result_new (G_OBJECT (stream),
590                                    callback,
591                                    data,
592                                    g_buffered_output_stream_write_async);
593
594   g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
595
596   /* if we have space left directly call the
597    * callback (from idle) otherwise schedule a buffer 
598    * flush in the thread. In both cases the actual
599    * copying of the data to the buffer will be done in
600    * the write_finish () func since that should
601    * be fast enough */
602   if (priv->len - priv->pos > 0)
603     {
604       g_simple_async_result_complete_in_idle (res);
605     }
606   else
607     {
608       wdata->fdata.flush_stream = FALSE;
609       wdata->fdata.close_stream = FALSE;
610       g_simple_async_result_run_in_thread (res, 
611                                            flush_buffer_thread, 
612                                            io_priority,
613                                            cancellable);
614       g_object_unref (res);
615     }
616 }
617
618 static gssize
619 g_buffered_output_stream_write_finish (GOutputStream        *stream,
620                                        GAsyncResult         *result,
621                                        GError              **error)
622 {
623   GBufferedOutputStreamPrivate *priv;
624   GBufferedOutputStream        *buffered_stream;
625   GSimpleAsyncResult *simple;
626   WriteData          *wdata;
627   gssize              count;
628
629   simple = G_SIMPLE_ASYNC_RESULT (result);
630   buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
631   priv = buffered_stream->priv;
632
633   g_assert (g_simple_async_result_get_source_tag (simple) == 
634             g_buffered_output_stream_write_async);
635
636   wdata = g_simple_async_result_get_op_res_gpointer (simple);
637
638   /* Now do the real copying of data to the buffer */
639   count = priv->len - priv->pos; 
640   count = MIN (wdata->count, count);
641
642   memcpy (priv->buffer + priv->pos, wdata->buffer, count);
643   
644   priv->pos += count;
645
646   return count;
647 }
648
649 static void
650 g_buffered_output_stream_flush_async (GOutputStream        *stream,
651                                       int                   io_priority,
652                                       GCancellable         *cancellable,
653                                       GAsyncReadyCallback   callback,
654                                       gpointer              data)
655 {
656   GSimpleAsyncResult *res;
657   FlushData          *fdata;
658
659   fdata = g_slice_new (FlushData);
660   fdata->flush_stream = TRUE;
661   fdata->close_stream = FALSE;
662
663   res = g_simple_async_result_new (G_OBJECT (stream),
664                                    callback,
665                                    data,
666                                    g_buffered_output_stream_flush_async);
667
668   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
669
670   g_simple_async_result_run_in_thread (res, 
671                                        flush_buffer_thread, 
672                                        io_priority,
673                                        cancellable);
674   g_object_unref (res);
675 }
676
677 static gboolean
678 g_buffered_output_stream_flush_finish (GOutputStream        *stream,
679                                        GAsyncResult         *result,
680                                        GError              **error)
681 {
682   GSimpleAsyncResult *simple;
683
684   simple = G_SIMPLE_ASYNC_RESULT (result);
685
686   g_assert (g_simple_async_result_get_source_tag (simple) == 
687             g_buffered_output_stream_flush_async);
688
689   return TRUE;
690 }
691
692 static void
693 g_buffered_output_stream_close_async (GOutputStream        *stream,
694                                       int                   io_priority,
695                                       GCancellable         *cancellable,
696                                       GAsyncReadyCallback   callback,
697                                       gpointer              data)
698 {
699   GSimpleAsyncResult *res;
700   FlushData          *fdata;
701
702   fdata = g_slice_new (FlushData);
703   fdata->close_stream = TRUE;
704
705   res = g_simple_async_result_new (G_OBJECT (stream),
706                                    callback,
707                                    data,
708                                    g_buffered_output_stream_close_async);
709
710   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
711
712   g_simple_async_result_run_in_thread (res, 
713                                        flush_buffer_thread, 
714                                        io_priority,
715                                        cancellable);
716   g_object_unref (res);
717 }
718
719 static gboolean
720 g_buffered_output_stream_close_finish (GOutputStream        *stream,
721                                        GAsyncResult         *result,
722                                        GError              **error)
723 {
724   GSimpleAsyncResult *simple;
725
726   simple = G_SIMPLE_ASYNC_RESULT (result);
727
728   g_assert (g_simple_async_result_get_source_tag (simple) == 
729             g_buffered_output_stream_flush_async);
730
731   return TRUE;
732 }
733
734 /* vim: ts=2 sw=2 et */