1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Christian Kellner <gicmo@gnome.org>
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gsimpleasyncresult.h"
33 * SECTION:gbufferedoutputstream
34 * @short_description: Buffered Output Stream
35 * @see_also: #GFilterOutputStream, #GOutputStream
37 * Buffered output stream implements #GFilterOutputStream and provides
38 * for buffered writes.
40 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
42 * To create a buffered output stream, use g_buffered_output_stream_new(),
43 * or g_buffered_output_stream_new_sized() to specify the buffer's size
46 * To get the size of a buffer within a buffered input stream, use
47 * g_buffered_output_stream_get_buffer_size(). To change the size of a
48 * buffered output stream's buffer, use
49 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's
50 * size cannot be reduced below the size of the data within the buffer.
53 #define DEFAULT_BUFFER_SIZE 4096
55 struct _GBufferedOutputStreamPrivate {
68 static void g_buffered_output_stream_set_property (GObject *object,
73 static void g_buffered_output_stream_get_property (GObject *object,
77 static void g_buffered_output_stream_finalize (GObject *object);
80 static gssize g_buffered_output_stream_write (GOutputStream *stream,
83 GCancellable *cancellable,
85 static gboolean g_buffered_output_stream_flush (GOutputStream *stream,
86 GCancellable *cancellable,
88 static gboolean g_buffered_output_stream_close (GOutputStream *stream,
89 GCancellable *cancellable,
92 static void g_buffered_output_stream_write_async (GOutputStream *stream,
96 GCancellable *cancellable,
97 GAsyncReadyCallback callback,
99 static gssize g_buffered_output_stream_write_finish (GOutputStream *stream,
100 GAsyncResult *result,
102 static void g_buffered_output_stream_flush_async (GOutputStream *stream,
104 GCancellable *cancellable,
105 GAsyncReadyCallback callback,
107 static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream,
108 GAsyncResult *result,
110 static void g_buffered_output_stream_close_async (GOutputStream *stream,
112 GCancellable *cancellable,
113 GAsyncReadyCallback callback,
115 static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream,
116 GAsyncResult *result,
119 G_DEFINE_TYPE (GBufferedOutputStream,
120 g_buffered_output_stream,
121 G_TYPE_FILTER_OUTPUT_STREAM)
125 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
127 GObjectClass *object_class;
128 GOutputStreamClass *ostream_class;
130 g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
132 object_class = G_OBJECT_CLASS (klass);
133 object_class->get_property = g_buffered_output_stream_get_property;
134 object_class->set_property = g_buffered_output_stream_set_property;
135 object_class->finalize = g_buffered_output_stream_finalize;
137 ostream_class = G_OUTPUT_STREAM_CLASS (klass);
138 ostream_class->write_fn = g_buffered_output_stream_write;
139 ostream_class->flush = g_buffered_output_stream_flush;
140 ostream_class->close_fn = g_buffered_output_stream_close;
141 ostream_class->write_async = g_buffered_output_stream_write_async;
142 ostream_class->write_finish = g_buffered_output_stream_write_finish;
143 ostream_class->flush_async = g_buffered_output_stream_flush_async;
144 ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
145 ostream_class->close_async = g_buffered_output_stream_close_async;
146 ostream_class->close_finish = g_buffered_output_stream_close_finish;
148 g_object_class_install_property (object_class,
150 g_param_spec_uint ("buffer-size",
152 P_("The size of the backend buffer"),
156 G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
157 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
159 g_object_class_install_property (object_class,
161 g_param_spec_boolean ("auto-grow",
163 P_("Whether the buffer should automatically grow"),
166 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
171 * g_buffered_output_stream_get_buffer_size:
172 * @stream: a #GBufferedOutputStream.
174 * Gets the size of the buffer in the @stream.
176 * Returns: the current size of the buffer.
179 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
181 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
183 return stream->priv->len;
187 * g_buffered_output_stream_set_buffer_size:
188 * @stream: a #GBufferedOutputStream.
191 * Sets the size of the internal buffer to @size.
194 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
197 GBufferedOutputStreamPrivate *priv;
200 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
204 if (size == priv->len)
209 size = MAX (size, priv->pos);
211 buffer = g_malloc (size);
212 memcpy (buffer, priv->buffer, priv->pos);
213 g_free (priv->buffer);
214 priv->buffer = buffer;
220 priv->buffer = g_malloc (size);
225 g_object_notify (G_OBJECT (stream), "buffer-size");
229 * g_buffered_output_stream_get_auto_grow:
230 * @stream: a #GBufferedOutputStream.
232 * Checks if the buffer automatically grows as data is added.
234 * Returns: %TRUE if the @stream's buffer automatically grows,
238 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
240 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
242 return stream->priv->auto_grow;
246 * g_buffered_output_stream_set_auto_grow:
247 * @stream: a #GBufferedOutputStream.
248 * @auto_grow: a #gboolean.
250 * Sets whether or not the @stream's buffer should automatically grow.
253 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
256 GBufferedOutputStreamPrivate *priv;
257 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
259 auto_grow = auto_grow != FALSE;
260 if (priv->auto_grow != auto_grow)
262 priv->auto_grow = auto_grow;
263 g_object_notify (G_OBJECT (stream), "auto-grow");
268 g_buffered_output_stream_set_property (GObject *object,
273 GBufferedOutputStream *stream;
275 stream = G_BUFFERED_OUTPUT_STREAM (object);
280 g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
284 g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
288 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
295 g_buffered_output_stream_get_property (GObject *object,
300 GBufferedOutputStream *buffered_stream;
301 GBufferedOutputStreamPrivate *priv;
303 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
304 priv = buffered_stream->priv;
309 g_value_set_uint (value, priv->len);
313 g_value_set_boolean (value, priv->auto_grow);
317 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
324 g_buffered_output_stream_finalize (GObject *object)
326 GBufferedOutputStream *stream;
327 GBufferedOutputStreamPrivate *priv;
329 stream = G_BUFFERED_OUTPUT_STREAM (object);
332 g_free (priv->buffer);
334 if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize)
335 (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object);
339 g_buffered_output_stream_init (GBufferedOutputStream *stream)
341 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
342 G_TYPE_BUFFERED_OUTPUT_STREAM,
343 GBufferedOutputStreamPrivate);
348 * g_buffered_output_stream_new:
349 * @base_stream: a #GOutputStream.
351 * Creates a new buffered output stream for a base stream.
353 * Returns: a #GOutputStream for the given @base_stream.
356 g_buffered_output_stream_new (GOutputStream *base_stream)
358 GOutputStream *stream;
360 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
362 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
363 "base-stream", base_stream,
370 * g_buffered_output_stream_new_sized:
371 * @base_stream: a #GOutputStream.
374 * Creates a new buffered output stream with a given buffer size.
376 * Returns: a #GOutputStream with an internal buffer set to @size.
379 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
382 GOutputStream *stream;
384 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
386 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
387 "base-stream", base_stream,
395 flush_buffer (GBufferedOutputStream *stream,
396 GCancellable *cancellable,
399 GBufferedOutputStreamPrivate *priv;
400 GOutputStream *base_stream;
407 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
409 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
411 res = g_output_stream_write_all (base_stream,
418 count = priv->pos - bytes_written;
421 g_memmove (priv->buffer, priv->buffer + bytes_written, count);
423 priv->pos -= bytes_written;
429 g_buffered_output_stream_write (GOutputStream *stream,
432 GCancellable *cancellable,
435 GBufferedOutputStream *bstream;
436 GBufferedOutputStreamPrivate *priv;
441 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
442 priv = bstream->priv;
444 n = priv->len - priv->pos;
446 if (priv->auto_grow && n < count)
448 new_size = MAX (priv->len * 2, priv->len + count);
449 g_buffered_output_stream_set_buffer_size (bstream, new_size);
453 res = flush_buffer (bstream, cancellable, error);
459 n = priv->len - priv->pos;
461 count = MIN (count, n);
462 memcpy (priv->buffer + priv->pos, buffer, count);
469 g_buffered_output_stream_flush (GOutputStream *stream,
470 GCancellable *cancellable,
473 GBufferedOutputStream *bstream;
474 GBufferedOutputStreamPrivate *priv;
475 GOutputStream *base_stream;
478 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
479 priv = bstream->priv;
480 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
482 res = flush_buffer (bstream, cancellable, error);
487 res = g_output_stream_flush (base_stream, cancellable, error);
493 g_buffered_output_stream_close (GOutputStream *stream,
494 GCancellable *cancellable,
497 GBufferedOutputStream *bstream;
498 GBufferedOutputStreamPrivate *priv;
499 GOutputStream *base_stream;
502 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
503 priv = bstream->priv;
504 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
506 res = flush_buffer (bstream, cancellable, error);
508 /* report the first error but still close the stream */
510 res = g_output_stream_close (base_stream, cancellable, error);
512 g_output_stream_close (base_stream, cancellable, NULL);
517 /* ************************** */
518 /* Async stuff implementation */
519 /* ************************** */
521 /* TODO: This should be using the base class async ops, not threads */
525 guint flush_stream : 1;
526 guint close_stream : 1;
531 free_flush_data (gpointer data)
533 g_slice_free (FlushData, data);
536 /* This function is used by all three (i.e.
537 * _write, _flush, _close) functions since
538 * all of them will need to flush the buffer
539 * and so closing and writing is just a special
540 * case of flushing + some addition stuff */
542 flush_buffer_thread (GSimpleAsyncResult *result,
544 GCancellable *cancellable)
546 GBufferedOutputStream *stream;
547 GOutputStream *base_stream;
550 GError *error = NULL;
552 stream = G_BUFFERED_OUTPUT_STREAM (object);
553 fdata = g_simple_async_result_get_op_res_gpointer (result);
554 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
556 res = flush_buffer (stream, cancellable, &error);
558 /* if flushing the buffer didn't work don't even bother
559 * to flush the stream but just report that error */
560 if (res && fdata->flush_stream)
561 res = g_output_stream_flush (base_stream, cancellable, &error);
563 if (fdata->close_stream)
566 /* if flushing the buffer or the stream returned
567 * an error report that first error but still try
568 * close the stream */
570 g_output_stream_close (base_stream, cancellable, NULL);
572 res = g_output_stream_close (base_stream, cancellable, &error);
577 g_simple_async_result_set_from_error (result, error);
578 g_error_free (error);
592 free_write_data (gpointer data)
594 g_slice_free (WriteData, data);
598 g_buffered_output_stream_write_async (GOutputStream *stream,
602 GCancellable *cancellable,
603 GAsyncReadyCallback callback,
606 GBufferedOutputStream *buffered_stream;
607 GBufferedOutputStreamPrivate *priv;
608 GSimpleAsyncResult *res;
611 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
612 priv = buffered_stream->priv;
614 wdata = g_slice_new (WriteData);
615 wdata->count = count;
616 wdata->buffer = buffer;
618 res = g_simple_async_result_new (G_OBJECT (stream),
621 g_buffered_output_stream_write_async);
623 g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
625 /* if we have space left directly call the
626 * callback (from idle) otherwise schedule a buffer
627 * flush in the thread. In both cases the actual
628 * copying of the data to the buffer will be done in
629 * the write_finish () func since that should
631 if (priv->len - priv->pos > 0)
633 g_simple_async_result_complete_in_idle (res);
637 wdata->fdata.flush_stream = FALSE;
638 wdata->fdata.close_stream = FALSE;
639 g_simple_async_result_run_in_thread (res,
643 g_object_unref (res);
648 g_buffered_output_stream_write_finish (GOutputStream *stream,
649 GAsyncResult *result,
652 GBufferedOutputStreamPrivate *priv;
653 GBufferedOutputStream *buffered_stream;
654 GSimpleAsyncResult *simple;
658 simple = G_SIMPLE_ASYNC_RESULT (result);
659 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
660 priv = buffered_stream->priv;
662 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
663 g_buffered_output_stream_write_async);
665 wdata = g_simple_async_result_get_op_res_gpointer (simple);
667 /* Now do the real copying of data to the buffer */
668 count = priv->len - priv->pos;
669 count = MIN (wdata->count, count);
671 memcpy (priv->buffer + priv->pos, wdata->buffer, count);
679 g_buffered_output_stream_flush_async (GOutputStream *stream,
681 GCancellable *cancellable,
682 GAsyncReadyCallback callback,
685 GSimpleAsyncResult *res;
688 fdata = g_slice_new (FlushData);
689 fdata->flush_stream = TRUE;
690 fdata->close_stream = FALSE;
692 res = g_simple_async_result_new (G_OBJECT (stream),
695 g_buffered_output_stream_flush_async);
697 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
699 g_simple_async_result_run_in_thread (res,
703 g_object_unref (res);
707 g_buffered_output_stream_flush_finish (GOutputStream *stream,
708 GAsyncResult *result,
711 GSimpleAsyncResult *simple;
713 simple = G_SIMPLE_ASYNC_RESULT (result);
715 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
716 g_buffered_output_stream_flush_async);
722 g_buffered_output_stream_close_async (GOutputStream *stream,
724 GCancellable *cancellable,
725 GAsyncReadyCallback callback,
728 GSimpleAsyncResult *res;
731 fdata = g_slice_new (FlushData);
732 fdata->close_stream = TRUE;
734 res = g_simple_async_result_new (G_OBJECT (stream),
737 g_buffered_output_stream_close_async);
739 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
741 g_simple_async_result_run_in_thread (res,
745 g_object_unref (res);
749 g_buffered_output_stream_close_finish (GOutputStream *stream,
750 GAsyncResult *result,
753 GSimpleAsyncResult *simple;
755 simple = G_SIMPLE_ASYNC_RESULT (result);
757 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
758 g_buffered_output_stream_flush_async);
763 #define __G_BUFFERED_OUTPUT_STREAM_C__
764 #include "gioaliasdef.c"