Add private _g_bus_get_singleton_if_exists() function
[platform/upstream/glib.git] / gio / gbufferedoutputstream.c
1 /* GIO - GLib Input, Output and Streaming Library
2  * 
3  * Copyright (C) 2006-2007 Red Hat, Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General
16  * Public License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18  * Boston, MA 02111-1307, USA.
19  *
20  * Author: Christian Kellner <gicmo@gnome.org> 
21  */
22
23 #include "config.h"
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gsimpleasyncresult.h"
27 #include "string.h"
28 #include "glibintl.h"
29
30 /**
31  * SECTION:gbufferedoutputstream
32  * @short_description: Buffered Output Stream
33  * @include: gio/gio.h
34  * @see_also: #GFilterOutputStream, #GOutputStream
35  * 
36  * Buffered output stream implements #GFilterOutputStream and provides 
37  * for buffered writes. 
38  * 
39  * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
40  * 
41  * To create a buffered output stream, use g_buffered_output_stream_new(), 
42  * or g_buffered_output_stream_new_sized() to specify the buffer's size 
43  * at construction.
44  * 
45  * To get the size of a buffer within a buffered input stream, use 
46  * g_buffered_output_stream_get_buffer_size(). To change the size of a 
47  * buffered output stream's buffer, use 
48  * g_buffered_output_stream_set_buffer_size(). Note that the buffer's 
49  * size cannot be reduced below the size of the data within the buffer.
50  **/
51
52 #define DEFAULT_BUFFER_SIZE 4096
53
54 struct _GBufferedOutputStreamPrivate {
55   guint8 *buffer; 
56   gsize   len;
57   goffset pos;
58   gboolean auto_grow;
59 };
60
61 enum {
62   PROP_0,
63   PROP_BUFSIZE,
64   PROP_AUTO_GROW
65 };
66
67 static void     g_buffered_output_stream_set_property (GObject      *object,
68                                                        guint         prop_id,
69                                                        const GValue *value,
70                                                        GParamSpec   *pspec);
71
72 static void     g_buffered_output_stream_get_property (GObject    *object,
73                                                        guint       prop_id,
74                                                        GValue     *value,
75                                                        GParamSpec *pspec);
76 static void     g_buffered_output_stream_finalize     (GObject *object);
77
78
79 static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
80                                                        const void    *buffer,
81                                                        gsize          count,
82                                                        GCancellable  *cancellable,
83                                                        GError       **error);
84 static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
85                                                        GCancellable  *cancellable,
86                                                        GError          **error);
87 static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
88                                                        GCancellable   *cancellable,
89                                                        GError        **error);
90
91 static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
92                                                        int                   io_priority,
93                                                        GCancellable         *cancellable,
94                                                        GAsyncReadyCallback   callback,
95                                                        gpointer              data);
96 static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
97                                                        GAsyncResult         *result,
98                                                        GError              **error);
99 static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
100                                                        int                   io_priority,
101                                                        GCancellable         *cancellable,
102                                                        GAsyncReadyCallback   callback,
103                                                        gpointer              data);
104 static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
105                                                        GAsyncResult         *result,
106                                                        GError              **error);
107
108 G_DEFINE_TYPE (GBufferedOutputStream,
109                g_buffered_output_stream,
110                G_TYPE_FILTER_OUTPUT_STREAM)
111
112
113 static void
114 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
115 {
116   GObjectClass *object_class;
117   GOutputStreamClass *ostream_class;
118  
119   g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
120
121   object_class = G_OBJECT_CLASS (klass);
122   object_class->get_property = g_buffered_output_stream_get_property;
123   object_class->set_property = g_buffered_output_stream_set_property;
124   object_class->finalize     = g_buffered_output_stream_finalize;
125
126   ostream_class = G_OUTPUT_STREAM_CLASS (klass);
127   ostream_class->write_fn = g_buffered_output_stream_write;
128   ostream_class->flush = g_buffered_output_stream_flush;
129   ostream_class->close_fn = g_buffered_output_stream_close;
130   ostream_class->flush_async  = g_buffered_output_stream_flush_async;
131   ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
132   ostream_class->close_async  = g_buffered_output_stream_close_async;
133   ostream_class->close_finish = g_buffered_output_stream_close_finish;
134
135   g_object_class_install_property (object_class,
136                                    PROP_BUFSIZE,
137                                    g_param_spec_uint ("buffer-size",
138                                                       P_("Buffer Size"),
139                                                       P_("The size of the backend buffer"),
140                                                       1,
141                                                       G_MAXUINT,
142                                                       DEFAULT_BUFFER_SIZE,
143                                                       G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
144                                                       G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
145
146   g_object_class_install_property (object_class,
147                                    PROP_AUTO_GROW,
148                                    g_param_spec_boolean ("auto-grow",
149                                                          P_("Auto-grow"),
150                                                          P_("Whether the buffer should automatically grow"),
151                                                          FALSE,
152                                                          G_PARAM_READWRITE|
153                                                          G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
154
155 }
156
157 /**
158  * g_buffered_output_stream_get_buffer_size:
159  * @stream: a #GBufferedOutputStream.
160  * 
161  * Gets the size of the buffer in the @stream.
162  * 
163  * Returns: the current size of the buffer.
164  **/
165 gsize
166 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
167 {
168   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
169
170   return stream->priv->len;
171 }
172
173 /**
174  * g_buffered_output_stream_set_buffer_size:
175  * @stream: a #GBufferedOutputStream.
176  * @size: a #gsize.
177  *
178  * Sets the size of the internal buffer to @size.
179  **/    
180 void
181 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
182                                           gsize                  size)
183 {
184   GBufferedOutputStreamPrivate *priv;
185   guint8 *buffer;
186   
187   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
188
189   priv = stream->priv;
190   
191   if (size == priv->len)
192     return;
193
194   if (priv->buffer)
195     {
196       size = MAX (size, priv->pos);
197
198       buffer = g_malloc (size);
199       memcpy (buffer, priv->buffer, priv->pos);
200       g_free (priv->buffer);
201       priv->buffer = buffer;
202       priv->len = size;
203       /* Keep old pos */
204     }
205   else
206     {
207       priv->buffer = g_malloc (size);
208       priv->len = size;
209       priv->pos = 0;
210     }
211
212   g_object_notify (G_OBJECT (stream), "buffer-size");
213 }
214
215 /**
216  * g_buffered_output_stream_get_auto_grow:
217  * @stream: a #GBufferedOutputStream.
218  * 
219  * Checks if the buffer automatically grows as data is added.
220  * 
221  * Returns: %TRUE if the @stream's buffer automatically grows,
222  * %FALSE otherwise.
223  **/  
224 gboolean
225 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
226 {
227   g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
228
229   return stream->priv->auto_grow;
230 }
231
232 /**
233  * g_buffered_output_stream_set_auto_grow:
234  * @stream: a #GBufferedOutputStream.
235  * @auto_grow: a #gboolean.
236  *
237  * Sets whether or not the @stream's buffer should automatically grow.
238  * If @auto_grow is true, then each write will just make the buffer
239  * larger, and you must manually flush the buffer to actually write out
240  * the data to the underlying stream.
241  **/
242 void
243 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
244                                         gboolean               auto_grow)
245 {
246   GBufferedOutputStreamPrivate *priv;
247   g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
248   priv = stream->priv;
249   auto_grow = auto_grow != FALSE;
250   if (priv->auto_grow != auto_grow)
251     {
252       priv->auto_grow = auto_grow;
253       g_object_notify (G_OBJECT (stream), "auto-grow");
254     }
255 }
256
257 static void
258 g_buffered_output_stream_set_property (GObject      *object,
259                                        guint         prop_id,
260                                        const GValue *value,
261                                        GParamSpec   *pspec)
262 {
263   GBufferedOutputStream *stream;
264
265   stream = G_BUFFERED_OUTPUT_STREAM (object);
266
267   switch (prop_id) 
268     {
269     case PROP_BUFSIZE:
270       g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
271       break;    
272
273     case PROP_AUTO_GROW:
274       g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
275       break;
276
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
279       break;
280     }
281
282 }
283
284 static void
285 g_buffered_output_stream_get_property (GObject    *object,
286                                        guint       prop_id,
287                                        GValue     *value,
288                                        GParamSpec *pspec)
289 {
290   GBufferedOutputStream *buffered_stream;
291   GBufferedOutputStreamPrivate *priv;
292
293   buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
294   priv = buffered_stream->priv;
295
296   switch (prop_id)
297     {
298     case PROP_BUFSIZE:
299       g_value_set_uint (value, priv->len);
300       break;
301
302     case PROP_AUTO_GROW:
303       g_value_set_boolean (value, priv->auto_grow);
304       break;
305
306     default:
307       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
308       break;
309     }
310
311 }
312
313 static void
314 g_buffered_output_stream_finalize (GObject *object)
315 {
316   GBufferedOutputStream *stream;
317   GBufferedOutputStreamPrivate *priv;
318
319   stream = G_BUFFERED_OUTPUT_STREAM (object);
320   priv = stream->priv;
321
322   g_free (priv->buffer);
323
324   G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
325 }
326
327 static void
328 g_buffered_output_stream_init (GBufferedOutputStream *stream)
329 {
330   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
331                                               G_TYPE_BUFFERED_OUTPUT_STREAM,
332                                               GBufferedOutputStreamPrivate);
333
334 }
335
336 /**
337  * g_buffered_output_stream_new:
338  * @base_stream: a #GOutputStream.
339  * 
340  * Creates a new buffered output stream for a base stream.
341  * 
342  * Returns: a #GOutputStream for the given @base_stream.
343  **/  
344 GOutputStream *
345 g_buffered_output_stream_new (GOutputStream *base_stream)
346 {
347   GOutputStream *stream;
348
349   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
350
351   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
352                          "base-stream", base_stream,
353                          NULL);
354
355   return stream;
356 }
357
358 /**
359  * g_buffered_output_stream_new_sized:
360  * @base_stream: a #GOutputStream.
361  * @size: a #gsize.
362  * 
363  * Creates a new buffered output stream with a given buffer size.
364  * 
365  * Returns: a #GOutputStream with an internal buffer set to @size.
366  **/  
367 GOutputStream *
368 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
369                                     gsize          size)
370 {
371   GOutputStream *stream;
372
373   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
374
375   stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
376                          "base-stream", base_stream,
377                          "buffer-size", size,
378                          NULL);
379
380   return stream;
381 }
382
383 static gboolean
384 flush_buffer (GBufferedOutputStream  *stream,
385               GCancellable           *cancellable,
386               GError                 **error)
387 {
388   GBufferedOutputStreamPrivate *priv;
389   GOutputStream                *base_stream;
390   gboolean                      res;
391   gsize                         bytes_written;
392   gsize                         count;
393
394   priv = stream->priv;
395   bytes_written = 0;
396   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
397
398   g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
399
400   res = g_output_stream_write_all (base_stream,
401                                    priv->buffer,
402                                    priv->pos,
403                                    &bytes_written,
404                                    cancellable,
405                                    error);
406
407   count = priv->pos - bytes_written;
408
409   if (count > 0)
410     g_memmove (priv->buffer, priv->buffer + bytes_written, count);
411   
412   priv->pos -= bytes_written;
413
414   return res;
415 }
416
417 static gssize
418 g_buffered_output_stream_write  (GOutputStream *stream,
419                                  const void    *buffer,
420                                  gsize          count,
421                                  GCancellable  *cancellable,
422                                  GError       **error)
423 {
424   GBufferedOutputStream        *bstream;
425   GBufferedOutputStreamPrivate *priv;
426   gboolean res;
427   gsize    n;
428   gsize new_size;
429
430   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
431   priv = bstream->priv;
432
433   n = priv->len - priv->pos;
434
435   if (priv->auto_grow && n < count)
436     {
437       new_size = MAX (priv->len * 2, priv->len + count);
438       g_buffered_output_stream_set_buffer_size (bstream, new_size);
439     }
440   else if (n == 0)
441     {
442       res = flush_buffer (bstream, cancellable, error);
443       
444       if (res == FALSE)
445         return -1;
446     }
447
448   n = priv->len - priv->pos;
449   
450   count = MIN (count, n);
451   memcpy (priv->buffer + priv->pos, buffer, count);
452   priv->pos += count;
453
454   return count;
455 }
456
457 static gboolean
458 g_buffered_output_stream_flush (GOutputStream  *stream,
459                                 GCancellable   *cancellable,
460                                 GError        **error)
461 {
462   GBufferedOutputStream *bstream;
463   GOutputStream                *base_stream;
464   gboolean res;
465
466   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
467   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
468
469   res = flush_buffer (bstream, cancellable, error);
470
471   if (res == FALSE)
472     return FALSE;
473
474   res = g_output_stream_flush (base_stream, cancellable, error);
475
476   return res;
477 }
478
479 static gboolean
480 g_buffered_output_stream_close (GOutputStream  *stream,
481                                 GCancellable   *cancellable,
482                                 GError        **error)
483 {
484   GBufferedOutputStream        *bstream;
485   GOutputStream                *base_stream;
486   gboolean                      res;
487
488   bstream = G_BUFFERED_OUTPUT_STREAM (stream);
489   base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
490   res = flush_buffer (bstream, cancellable, error);
491
492   if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
493     {
494       /* report the first error but still close the stream */
495       if (res)
496         res = g_output_stream_close (base_stream, cancellable, error);
497       else
498         g_output_stream_close (base_stream, cancellable, NULL);
499     }
500
501   return res;
502 }
503
504 /* ************************** */
505 /* Async stuff implementation */
506 /* ************************** */
507
508 /* TODO: This should be using the base class async ops, not threads */
509
510 typedef struct {
511
512   guint flush_stream : 1;
513   guint close_stream : 1;
514
515 } FlushData;
516
517 static void
518 free_flush_data (gpointer data)
519 {
520   g_slice_free (FlushData, data);
521 }
522
523 /* This function is used by all three (i.e. 
524  * _write, _flush, _close) functions since
525  * all of them will need to flush the buffer
526  * and so closing and writing is just a special
527  * case of flushing + some addition stuff */
528 static void
529 flush_buffer_thread (GSimpleAsyncResult *result,
530                      GObject            *object,
531                      GCancellable       *cancellable)
532 {
533   GBufferedOutputStream *stream;
534   GOutputStream *base_stream;
535   FlushData     *fdata;
536   gboolean       res;
537   GError        *error = NULL;
538
539   stream = G_BUFFERED_OUTPUT_STREAM (object);
540   fdata = g_simple_async_result_get_op_res_gpointer (result);
541   base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
542
543   res = flush_buffer (stream, cancellable, &error);
544
545   /* if flushing the buffer didn't work don't even bother
546    * to flush the stream but just report that error */
547   if (res && fdata->flush_stream)
548     res = g_output_stream_flush (base_stream, cancellable, &error);
549
550   if (fdata->close_stream) 
551     {
552      
553       /* if flushing the buffer or the stream returned 
554        * an error report that first error but still try 
555        * close the stream */
556       if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
557         {
558           if (res == FALSE)
559             g_output_stream_close (base_stream, cancellable, NULL);
560           else 
561             res = g_output_stream_close (base_stream, cancellable, &error);
562         }
563     }
564
565   if (res == FALSE)
566     g_simple_async_result_take_error (result, error);
567 }
568
569 static void
570 g_buffered_output_stream_flush_async (GOutputStream        *stream,
571                                       int                   io_priority,
572                                       GCancellable         *cancellable,
573                                       GAsyncReadyCallback   callback,
574                                       gpointer              data)
575 {
576   GSimpleAsyncResult *res;
577   FlushData          *fdata;
578
579   fdata = g_slice_new (FlushData);
580   fdata->flush_stream = TRUE;
581   fdata->close_stream = FALSE;
582
583   res = g_simple_async_result_new (G_OBJECT (stream),
584                                    callback,
585                                    data,
586                                    g_buffered_output_stream_flush_async);
587
588   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
589
590   g_simple_async_result_run_in_thread (res, 
591                                        flush_buffer_thread, 
592                                        io_priority,
593                                        cancellable);
594   g_object_unref (res);
595 }
596
597 static gboolean
598 g_buffered_output_stream_flush_finish (GOutputStream        *stream,
599                                        GAsyncResult         *result,
600                                        GError              **error)
601 {
602   GSimpleAsyncResult *simple;
603
604   simple = G_SIMPLE_ASYNC_RESULT (result);
605
606   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
607             g_buffered_output_stream_flush_async);
608
609   return TRUE;
610 }
611
612 static void
613 g_buffered_output_stream_close_async (GOutputStream        *stream,
614                                       int                   io_priority,
615                                       GCancellable         *cancellable,
616                                       GAsyncReadyCallback   callback,
617                                       gpointer              data)
618 {
619   GSimpleAsyncResult *res;
620   FlushData          *fdata;
621
622   fdata = g_slice_new (FlushData);
623   fdata->close_stream = TRUE;
624
625   res = g_simple_async_result_new (G_OBJECT (stream),
626                                    callback,
627                                    data,
628                                    g_buffered_output_stream_close_async);
629
630   g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
631
632   g_simple_async_result_run_in_thread (res, 
633                                        flush_buffer_thread, 
634                                        io_priority,
635                                        cancellable);
636   g_object_unref (res);
637 }
638
639 static gboolean
640 g_buffered_output_stream_close_finish (GOutputStream        *stream,
641                                        GAsyncResult         *result,
642                                        GError              **error)
643 {
644   GSimpleAsyncResult *simple;
645
646   simple = G_SIMPLE_ASYNC_RESULT (result);
647
648   g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
649             g_buffered_output_stream_close_async);
650
651   return TRUE;
652 }