139d5882877ac2f995199b13a7ef20ebf5c07c31
[platform/upstream/glib.git] / gio / gbufferedoutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Christian Kellner <gicmo@gnome.org> 
21  */
22
23 #include <config.h>
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gsimpleasyncresult.h"
27 #include "string.h"
28 #include "glibintl.h"
29
30 #include <gioalias.h>
31
32 /**
33  * SECTION:gbufferedoutputstream
34  * @short_description: Buffered Output Stream
35  * @see_also: #GFilterOutputStream, #GOutputStream
36  * 
37  * Buffered output stream implements #GFilterOutputStream and provides 
38  * for buffered writes. 
39  * 
40  * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
41  * 
42  * To create a buffered output stream, use g_buffered_output_stream_new(), or 
43  * g_buffered_output_stream_new_sized() to specify the buffer's size at construction.
44  * 
45  * To get the size of a buffer within a buffered input stream, use 
46  * g_buffered_output_stream_get_buffer_size(). To change the size of a 
47  * buffered output stream's buffer, use g_buffered_output_stream_set_buffer_size(). 
48  * Note: the buffer's size cannot be reduced below the size of the data within the
49  * buffer.
50  *
51  **/
52
53
54
55 #define DEFAULT_BUFFER_SIZE 4096
56
57 struct _GBufferedOutputStreamPrivate {
58   guint8 *buffer; 
59   gsize   len;
60   goffset pos;
61   gboolean auto_grow;
62 };
63
64 enum {
65   PROP_0,
66   PROP_BUFSIZE
67 };
68
69 static void     g_buffered_output_stream_set_property (GObject      *object,
70                                                        guint         prop_id,
71                                                        const GValue *value,
72                                                        GParamSpec   *pspec);
73
74 static void     g_buffered_output_stream_get_property (GObject    *object,
75                                                        guint       prop_id,
76                                                        GValue     *value,
77                                                        GParamSpec *pspec);
78 static void     g_buffered_output_stream_finalize     (GObject *object);
79
80
81 static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
82                                                        const void    *buffer,
83                                                        gsize          count,
84                                                        GCancellable  *cancellable,
85                                                        GError       **error);
86 static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
87                                                        GCancellable  *cancellable,
88                                                        GError          **error);
89 static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
90                                                        GCancellable   *cancellable,
91                                                        GError        **error);
92
93 static void     g_buffered_output_stream_write_async  (GOutputStream        *stream,
94                                                        const void           *buffer,
95                                                        gsize                 count,
96                                                        int                   io_priority,
97                                                        GCancellable         *cancellable,
98                                                        GAsyncReadyCallback   callback,
99                                                        gpointer              data);
100 static gssize   g_buffered_output_stream_write_finish (GOutputStream        *stream,
101                                                        GAsyncResult         *result,
102                                                        GError              **error);
103 static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
104                                                        int                   io_priority,
105                                                        GCancellable         *cancellable,
106                                                        GAsyncReadyCallback   callback,
107                                                        gpointer              data);
108 static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
109                                                        GAsyncResult         *result,
110                                                        GError              **error);
111 static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
112                                                        int                   io_priority,
113                                                        GCancellable         *cancellable,
114                                                        GAsyncReadyCallback   callback,
115                                                        gpointer              data);
116 static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
117                                                        GAsyncResult         *result,
118                                                        GError              **error);
119
120 G_DEFINE_TYPE (GBufferedOutputStream,
121                g_buffered_output_stream,
122                G_TYPE_FILTER_OUTPUT_STREAM)
123
124
125 static void
126 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
127 {
128   GObjectClass *object_class;
129   GOutputStreamClass *ostream_class;
130  
131   g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
132
133   object_class = G_OBJECT_CLASS (klass);
134   object_class->get_property = g_buffered_output_stream_get_property;
135   object_class->set_property = g_buffered_output_stream_set_property;
136   object_class->finalize     = g_buffered_output_stream_finalize;
137
138   ostream_class = G_OUTPUT_STREAM_CLASS (klass);
139   ostream_class->write = g_buffered_output_stream_write;
140   ostream_class->flush = g_buffered_output_stream_flush;
141   ostream_class->close = g_buffered_output_stream_close;
142   ostream_class->write_async  = g_buffered_output_stream_write_async;
143   ostream_class->write_finish = g_buffered_output_stream_write_finish;
144   ostream_class->flush_async  = g_buffered_output_stream_flush_async;
145   ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
146   ostream_class->close_async  = g_buffered_output_stream_close_async;
147   ostream_class->close_finish = g_buffered_output_stream_close_finish;
148
149   g_object_class_install_property (object_class,
150                                    PROP_BUFSIZE,
151                                    g_param_spec_uint ("buffer-size",
152                                                       P_("Buffer Size"),
153                                                       P_("The size of the backend buffer"),
154                                                       1,
155                                                       G_MAXUINT,
156                                                       DEFAULT_BUFFER_SIZE,
157                                                       G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
158                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
159
160 }
161
162 /**
163  * g_buffered_output_stream_get_buffer_size:
164  * @stream: a #GBufferedOutputStream.
165  * 
166  * Gets the size of the buffer in the @stream.
167  * 
168  * Returns: the current size of the buffer.
169  **/
170 gsize
171 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
172 {
173   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
174
175   return stream->priv->len;
176 }
177
178 /**
179  * g_buffered_output_stream_set_buffer_size:
180  * @stream: a #GBufferedOutputStream.
181  * @size: a #gsize.
182  *
183  * Sets the size of the internal buffer to @size.
184  **/    
185 void
186 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
187                                          gsize                 size)
188 {
189   GBufferedOutputStreamPrivate *priv;
190   guint8 *buffer;
191   
192   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
193
194   priv = stream->priv;
195   
196   if (priv->buffer)
197     {
198       size = MAX (size, priv->pos);
199
200       buffer = g_malloc (size);
201       memcpy (buffer, priv->buffer, priv->pos);
202       g_free (priv->buffer);
203       priv->buffer = buffer;
204       priv->len = size;
205       /* Keep old pos */
206     }
207   else
208     {
209       priv->buffer = g_malloc (size);
210       priv->len = size;
211       priv->pos = 0;
212     }
213 }
214
215 /**
216  * g_buffered_output_stream_get_auto_grow:
217  * @stream: a #GBufferedOutputStream.
218  * 
219  * Checks if the buffer automatically grows as data is added.
220  * 
221  * Returns: %TRUE if the @stream's buffer automatically grows,
222  * %FALSE otherwise.
223  **/  
224 gboolean
225 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
226 {
227   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
228
229   return stream->priv->auto_grow;
230 }
231
232 /**
233  * g_buffered_output_stream_set_auto_grow:
234  * @stream: a #GBufferedOutputStream.
235  * @auto_grow: a #gboolean.
236  *
237  * Sets whether or not the @stream's buffer should automatically grow.
238  **/
239 void
240 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
241                                        gboolean              auto_grow)
242 {
243   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
244
245   stream->priv->auto_grow = auto_grow;
246 }
247
248 static void
249 g_buffered_output_stream_set_property (GObject         *object,
250                                        guint            prop_id,
251                                        const GValue    *value,
252                                        GParamSpec      *pspec)
253 {
254   GBufferedOutputStream *buffered_stream;
255   GBufferedOutputStreamPrivate *priv;
256
257   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
258   priv = buffered_stream->priv;
259
260   switch (prop_id) 
261     {
262
263     case PROP_BUFSIZE:
264       g_buffered_output_stream_set_buffer_size (buffered_stream, g_value_get_uint (value));
265       break;    
266
267     default:
268       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
269       break;
270     }
271
272 }
273
274 static void
275 g_buffered_output_stream_get_property (GObject    *object,
276                                        guint       prop_id,
277                                        GValue     *value,
278                                        GParamSpec *pspec)
279 {
280   GBufferedOutputStream *buffered_stream;
281   GBufferedOutputStreamPrivate *priv;
282
283   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
284   priv = buffered_stream->priv;
285
286   switch (prop_id)
287     {
288
289     case PROP_BUFSIZE:
290       g_value_set_uint (value, priv->len);
291       break;
292
293     default:
294       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
295       break;
296     }
297
298 }
299
300 static void
301 g_buffered_output_stream_finalize (GObject *object)
302 {
303   GBufferedOutputStream *stream;
304   GBufferedOutputStreamPrivate *priv;
305
306   stream = G_BUFFERED_OUTPUT_STREAM (object);
307   priv = stream->priv;
308
309   g_free (priv->buffer);
310
311   if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize)
312     (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object);
313 }
314
315 static void
316 g_buffered_output_stream_init (GBufferedOutputStream *stream)
317 {
318   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
319                                               G_TYPE_BUFFERED_OUTPUT_STREAM,
320                                               GBufferedOutputStreamPrivate);
321
322 }
323
324 /**
325  * g_buffered_output_stream_new:
326  * @base_stream: a #GOutputStream.
327  * 
328  * Creates a new buffered output stream for a base stream.
329  * 
330  * Returns: a #GOutputStream for the given @base_stream.
331  **/  
332 GOutputStream *
333 g_buffered_output_stream_new (GOutputStream *base_stream)
334 {
335   GOutputStream *stream;
336
337   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
338
339   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
340                          "base-stream", base_stream,
341                          NULL);
342
343   return stream;
344 }
345
346 /**
347  * g_buffered_output_stream_new_sized:
348  * @base_stream: a #GOutputStream.
349  * @size: a #gsize.
350  * 
351  * Creates a new buffered output stream with a given buffer size.
352  * 
353  * Returns: a #GOutputStream with an internal buffer set to @size.
354  **/  
355 GOutputStream *
356 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
357                                     guint          size)
358 {
359   GOutputStream *stream;
360
361   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
362
363   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
364                          "base-stream", base_stream,
365                          "buffer-size", size,
366                          NULL);
367
368   return stream;
369 }
370
371 static gboolean
372 flush_buffer (GBufferedOutputStream  *stream,
373               GCancellable           *cancellable,
374               GError                 **error)
375 {
376   GBufferedOutputStreamPrivate *priv;
377   GOutputStream                *base_stream;
378   gboolean                      res;
379   gsize                         bytes_written;
380   gsize                         count;
381
382   priv = stream->priv;
383   bytes_written = 0;
384   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
385
386   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
387
388   res = g_output_stream_write_all (base_stream,
389                                    priv->buffer,
390                                    priv->pos,
391                                    &bytes_written,
392                                    cancellable,
393                                    error);
394
395   count = priv->pos - bytes_written;
396
397   if (count > 0)
398     g_memmove (priv->buffer, priv->buffer + bytes_written, count);
399   
400   priv->pos -= bytes_written;
401
402   return res;
403 }
404
405 static gssize
406 g_buffered_output_stream_write  (GOutputStream *stream,
407                                  const void    *buffer,
408                                  gsize          count,
409                                  GCancellable  *cancellable,
410                                  GError       **error)
411 {
412   GBufferedOutputStream        *bstream;
413   GBufferedOutputStreamPrivate *priv;
414   gboolean res;
415   gsize    n;
416   gsize new_size;
417
418   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
419   priv = bstream->priv;
420
421   n = priv->len - priv->pos;
422
423   if (priv->auto_grow && n < count)
424     {
425       new_size = MAX (priv->len * 2, priv->len + count);
426       g_buffered_output_stream_set_buffer_size (bstream, new_size);
427     }
428   else if (n == 0)
429     {
430       res = flush_buffer (bstream, cancellable, error);
431       
432       if (res == FALSE)
433         return -1;
434     }
435
436   n = priv->len - priv->pos;
437   
438   count = MIN (count, n);
439   memcpy (priv->buffer + priv->pos, buffer, count);
440   priv->pos += count;
441
442   return count;
443 }
444
445 static gboolean
446 g_buffered_output_stream_flush (GOutputStream  *stream,
447                                 GCancellable   *cancellable,
448                                 GError        **error)
449 {
450   GBufferedOutputStream *bstream;
451   GBufferedOutputStreamPrivate *priv;
452   GOutputStream                *base_stream;
453   gboolean res;
454
455   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
456   priv = bstream->priv;
457   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
458
459   res = flush_buffer (bstream, cancellable, error);
460
461   if (res == FALSE) {
462     return FALSE;
463   }
464
465   res = g_output_stream_flush (base_stream,
466                                cancellable,
467                                error);
468   return res;
469 }
470
471 static gboolean
472 g_buffered_output_stream_close (GOutputStream  *stream,
473                                 GCancellable   *cancellable,
474                                 GError        **error)
475 {
476   GBufferedOutputStream        *bstream;
477   GBufferedOutputStreamPrivate *priv;
478   GOutputStream                *base_stream;
479   gboolean                      res;
480
481   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
482   priv = bstream->priv;
483   base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
484
485   res = flush_buffer (bstream, cancellable, error);
486
487   /* report the first error but still close the stream */
488   if (res)
489     {
490       res = g_output_stream_close (base_stream,
491                                    cancellable,
492                                    error); 
493     }
494   else
495     {
496       g_output_stream_close (base_stream,
497                              cancellable,
498                              NULL); 
499     }
500
501   return res;
502 }
503
504 /* ************************** */
505 /* Async stuff implementation */
506 /* ************************** */
507
508 /* TODO: This should be using the base class async ops, not threads */
509
510 typedef struct {
511
512   guint flush_stream : 1;
513   guint close_stream : 1;
514
515 } FlushData;
516
517 static void
518 free_flush_data (gpointer data)
519 {
520   g_slice_free (FlushData, data);
521 }
522
523 /* This function is used by all three (i.e. 
524  * _write, _flush, _close) functions since
525  * all of them will need to flush the buffer
526  * and so closing and writing is just a special
527  * case of flushing + some addition stuff */
528 static void
529 flush_buffer_thread (GSimpleAsyncResult *result,
530                      GObject            *object,
531                      GCancellable       *cancellable)
532 {
533   GBufferedOutputStream *stream;
534   GOutputStream *base_stream;
535   FlushData     *fdata;
536   gboolean       res;
537   GError        *error = NULL;
538
539   stream = G_BUFFERED_OUTPUT_STREAM (object);
540   fdata = g_simple_async_result_get_op_res_gpointer (result);
541   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
542
543   res = flush_buffer (stream, cancellable, &error);
544
545   /* if flushing the buffer didn't work don't even bother
546    * to flush the stream but just report that error */
547   if (res && fdata->flush_stream)
548     {
549       res = g_output_stream_flush (base_stream,
550                                    cancellable,
551                                    &error);
552     }
553
554   if (fdata->close_stream) 
555     {
556      
557       /* if flushing the buffer or the stream returned 
558        * an error report that first error but still try 
559        * close the stream */
560       if (res == FALSE)
561         {
562           g_output_stream_close (base_stream,
563                                  cancellable,
564                                  NULL);
565         } 
566       else 
567         {
568           res = g_output_stream_close (base_stream,
569                                        cancellable,
570                                        &error);
571         } 
572
573     }
574
575   if (res == FALSE)
576     {
577       g_simple_async_result_set_from_error (result, error);
578       g_error_free (error);
579     }
580 }
581
582 typedef struct {
583     
584   FlushData fdata;
585
586   gsize  count;
587   const void  *buffer;
588
589 } WriteData;
590
591 static void 
592 free_write_data (gpointer data)
593 {
594   g_slice_free (WriteData, data);
595 }
596
597 static void
598 g_buffered_output_stream_write_async (GOutputStream        *stream,
599                                       const void           *buffer,
600                                       gsize                 count,
601                                       int                   io_priority,
602                                       GCancellable         *cancellable,
603                                       GAsyncReadyCallback   callback,
604                                       gpointer              data)
605 {
606   GBufferedOutputStream *buffered_stream;
607   GBufferedOutputStreamPrivate *priv;
608   GSimpleAsyncResult *res;
609   WriteData *wdata;
610
611   buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
612   priv = buffered_stream->priv;
613
614   wdata = g_slice_new (WriteData);
615   wdata->count  = count;
616   wdata->buffer = buffer;
617
618   res = g_simple_async_result_new (G_OBJECT (stream),
619                                    callback,
620                                    data,
621                                    g_buffered_output_stream_write_async);
622
623   g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
624
625   /* if we have space left directly call the
626    * callback (from idle) otherwise schedule a buffer 
627    * flush in the thread. In both cases the actual
628    * copying of the data to the buffer will be done in
629    * the write_finish () func since that should
630    * be fast enough */
631   if (priv->len - priv->pos > 0)
632     {
633       g_simple_async_result_complete_in_idle (res);
634     }
635   else
636     {
637       wdata->fdata.flush_stream = FALSE;
638       wdata->fdata.close_stream = FALSE;
639       g_simple_async_result_run_in_thread (res, 
640                                            flush_buffer_thread, 
641                                            io_priority,
642                                            cancellable);
643       g_object_unref (res);
644     }
645 }
646
647 static gssize
648 g_buffered_output_stream_write_finish (GOutputStream        *stream,
649                                        GAsyncResult         *result,
650                                        GError              **error)
651 {
652   GBufferedOutputStreamPrivate *priv;
653   GBufferedOutputStream        *buffered_stream;
654   GSimpleAsyncResult *simple;
655   WriteData          *wdata;
656   gssize              count;
657
658   simple = G_SIMPLE_ASYNC_RESULT (result);
659   buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
660   priv = buffered_stream->priv;
661
662   g_assert (g_simple_async_result_get_source_tag (simple) == 
663             g_buffered_output_stream_write_async);
664
665   wdata = g_simple_async_result_get_op_res_gpointer (simple);
666
667   /* Now do the real copying of data to the buffer */
668   count = priv->len - priv->pos; 
669   count = MIN (wdata->count, count);
670
671   memcpy (priv->buffer + priv->pos, wdata->buffer, count);
672   
673   priv->pos += count;
674
675   return count;
676 }
677
678 static void
679 g_buffered_output_stream_flush_async (GOutputStream        *stream,
680                                       int                   io_priority,
681                                       GCancellable         *cancellable,
682                                       GAsyncReadyCallback   callback,
683                                       gpointer              data)
684 {
685   GSimpleAsyncResult *res;
686   FlushData          *fdata;
687
688   fdata = g_slice_new (FlushData);
689   fdata->flush_stream = TRUE;
690   fdata->close_stream = FALSE;
691
692   res = g_simple_async_result_new (G_OBJECT (stream),
693                                    callback,
694                                    data,
695                                    g_buffered_output_stream_flush_async);
696
697   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
698
699   g_simple_async_result_run_in_thread (res, 
700                                        flush_buffer_thread, 
701                                        io_priority,
702                                        cancellable);
703   g_object_unref (res);
704 }
705
706 static gboolean
707 g_buffered_output_stream_flush_finish (GOutputStream        *stream,
708                                        GAsyncResult         *result,
709                                        GError              **error)
710 {
711   GSimpleAsyncResult *simple;
712
713   simple = G_SIMPLE_ASYNC_RESULT (result);
714
715   g_assert (g_simple_async_result_get_source_tag (simple) == 
716             g_buffered_output_stream_flush_async);
717
718   return TRUE;
719 }
720
721 static void
722 g_buffered_output_stream_close_async (GOutputStream        *stream,
723                                       int                   io_priority,
724                                       GCancellable         *cancellable,
725                                       GAsyncReadyCallback   callback,
726                                       gpointer              data)
727 {
728   GSimpleAsyncResult *res;
729   FlushData          *fdata;
730
731   fdata = g_slice_new (FlushData);
732   fdata->close_stream = TRUE;
733
734   res = g_simple_async_result_new (G_OBJECT (stream),
735                                    callback,
736                                    data,
737                                    g_buffered_output_stream_close_async);
738
739   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
740
741   g_simple_async_result_run_in_thread (res, 
742                                        flush_buffer_thread, 
743                                        io_priority,
744                                        cancellable);
745   g_object_unref (res);
746 }
747
748 static gboolean
749 g_buffered_output_stream_close_finish (GOutputStream        *stream,
750                                        GAsyncResult         *result,
751                                        GError              **error)
752 {
753   GSimpleAsyncResult *simple;
754
755   simple = G_SIMPLE_ASYNC_RESULT (result);
756
757   g_assert (g_simple_async_result_get_source_tag (simple) == 
758             g_buffered_output_stream_flush_async);
759
760   return TRUE;
761 }
762
763 #define __G_BUFFERED_OUTPUT_STREAM_C__
764 #include "gioaliasdef.c"