1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Christian Kellner <gicmo@gnome.org>
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gsimpleasyncresult.h"
33 * SECTION:gbufferedoutputstream
34 * @short_description: Buffered Output Stream
36 * @see_also: #GFilterOutputStream, #GOutputStream
38 * Buffered output stream implements #GFilterOutputStream and provides
39 * for buffered writes.
41 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
43 * To create a buffered output stream, use g_buffered_output_stream_new(),
44 * or g_buffered_output_stream_new_sized() to specify the buffer's size
47 * To get the size of a buffer within a buffered input stream, use
48 * g_buffered_output_stream_get_buffer_size(). To change the size of a
49 * buffered output stream's buffer, use
50 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's
51 * size cannot be reduced below the size of the data within the buffer.
54 #define DEFAULT_BUFFER_SIZE 4096
56 struct _GBufferedOutputStreamPrivate {
69 static void g_buffered_output_stream_set_property (GObject *object,
74 static void g_buffered_output_stream_get_property (GObject *object,
78 static void g_buffered_output_stream_finalize (GObject *object);
81 static gssize g_buffered_output_stream_write (GOutputStream *stream,
84 GCancellable *cancellable,
86 static gboolean g_buffered_output_stream_flush (GOutputStream *stream,
87 GCancellable *cancellable,
89 static gboolean g_buffered_output_stream_close (GOutputStream *stream,
90 GCancellable *cancellable,
93 static void g_buffered_output_stream_write_async (GOutputStream *stream,
97 GCancellable *cancellable,
98 GAsyncReadyCallback callback,
100 static gssize g_buffered_output_stream_write_finish (GOutputStream *stream,
101 GAsyncResult *result,
103 static void g_buffered_output_stream_flush_async (GOutputStream *stream,
105 GCancellable *cancellable,
106 GAsyncReadyCallback callback,
108 static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream,
109 GAsyncResult *result,
111 static void g_buffered_output_stream_close_async (GOutputStream *stream,
113 GCancellable *cancellable,
114 GAsyncReadyCallback callback,
116 static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream,
117 GAsyncResult *result,
120 G_DEFINE_TYPE (GBufferedOutputStream,
121 g_buffered_output_stream,
122 G_TYPE_FILTER_OUTPUT_STREAM)
126 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
128 GObjectClass *object_class;
129 GOutputStreamClass *ostream_class;
131 g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
133 object_class = G_OBJECT_CLASS (klass);
134 object_class->get_property = g_buffered_output_stream_get_property;
135 object_class->set_property = g_buffered_output_stream_set_property;
136 object_class->finalize = g_buffered_output_stream_finalize;
138 ostream_class = G_OUTPUT_STREAM_CLASS (klass);
139 ostream_class->write_fn = g_buffered_output_stream_write;
140 ostream_class->flush = g_buffered_output_stream_flush;
141 ostream_class->close_fn = g_buffered_output_stream_close;
142 ostream_class->write_async = g_buffered_output_stream_write_async;
143 ostream_class->write_finish = g_buffered_output_stream_write_finish;
144 ostream_class->flush_async = g_buffered_output_stream_flush_async;
145 ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
146 ostream_class->close_async = g_buffered_output_stream_close_async;
147 ostream_class->close_finish = g_buffered_output_stream_close_finish;
149 g_object_class_install_property (object_class,
151 g_param_spec_uint ("buffer-size",
153 P_("The size of the backend buffer"),
157 G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
158 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
160 g_object_class_install_property (object_class,
162 g_param_spec_boolean ("auto-grow",
164 P_("Whether the buffer should automatically grow"),
167 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
172 * g_buffered_output_stream_get_buffer_size:
173 * @stream: a #GBufferedOutputStream.
175 * Gets the size of the buffer in the @stream.
177 * Returns: the current size of the buffer.
180 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
182 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
184 return stream->priv->len;
188 * g_buffered_output_stream_set_buffer_size:
189 * @stream: a #GBufferedOutputStream.
192 * Sets the size of the internal buffer to @size.
195 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
198 GBufferedOutputStreamPrivate *priv;
201 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
205 if (size == priv->len)
210 size = MAX (size, priv->pos);
212 buffer = g_malloc (size);
213 memcpy (buffer, priv->buffer, priv->pos);
214 g_free (priv->buffer);
215 priv->buffer = buffer;
221 priv->buffer = g_malloc (size);
226 g_object_notify (G_OBJECT (stream), "buffer-size");
230 * g_buffered_output_stream_get_auto_grow:
231 * @stream: a #GBufferedOutputStream.
233 * Checks if the buffer automatically grows as data is added.
235 * Returns: %TRUE if the @stream's buffer automatically grows,
239 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
241 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
243 return stream->priv->auto_grow;
247 * g_buffered_output_stream_set_auto_grow:
248 * @stream: a #GBufferedOutputStream.
249 * @auto_grow: a #gboolean.
251 * Sets whether or not the @stream's buffer should automatically grow.
252 * If @auto_grow is true, then each write will just make the buffer
253 * larger, and you must manually flush the buffer to actually write out
254 * the data to the underlying stream.
257 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
260 GBufferedOutputStreamPrivate *priv;
261 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
263 auto_grow = auto_grow != FALSE;
264 if (priv->auto_grow != auto_grow)
266 priv->auto_grow = auto_grow;
267 g_object_notify (G_OBJECT (stream), "auto-grow");
272 g_buffered_output_stream_set_property (GObject *object,
277 GBufferedOutputStream *stream;
279 stream = G_BUFFERED_OUTPUT_STREAM (object);
284 g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
288 g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
292 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
299 g_buffered_output_stream_get_property (GObject *object,
304 GBufferedOutputStream *buffered_stream;
305 GBufferedOutputStreamPrivate *priv;
307 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
308 priv = buffered_stream->priv;
313 g_value_set_uint (value, priv->len);
317 g_value_set_boolean (value, priv->auto_grow);
321 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
328 g_buffered_output_stream_finalize (GObject *object)
330 GBufferedOutputStream *stream;
331 GBufferedOutputStreamPrivate *priv;
333 stream = G_BUFFERED_OUTPUT_STREAM (object);
336 g_free (priv->buffer);
338 if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize)
339 (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object);
343 g_buffered_output_stream_init (GBufferedOutputStream *stream)
345 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
346 G_TYPE_BUFFERED_OUTPUT_STREAM,
347 GBufferedOutputStreamPrivate);
352 * g_buffered_output_stream_new:
353 * @base_stream: a #GOutputStream.
355 * Creates a new buffered output stream for a base stream.
357 * Returns: a #GOutputStream for the given @base_stream.
360 g_buffered_output_stream_new (GOutputStream *base_stream)
362 GOutputStream *stream;
364 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
366 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
367 "base-stream", base_stream,
374 * g_buffered_output_stream_new_sized:
375 * @base_stream: a #GOutputStream.
378 * Creates a new buffered output stream with a given buffer size.
380 * Returns: a #GOutputStream with an internal buffer set to @size.
383 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
386 GOutputStream *stream;
388 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
390 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
391 "base-stream", base_stream,
399 flush_buffer (GBufferedOutputStream *stream,
400 GCancellable *cancellable,
403 GBufferedOutputStreamPrivate *priv;
404 GOutputStream *base_stream;
411 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
413 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
415 res = g_output_stream_write_all (base_stream,
422 count = priv->pos - bytes_written;
425 g_memmove (priv->buffer, priv->buffer + bytes_written, count);
427 priv->pos -= bytes_written;
433 g_buffered_output_stream_write (GOutputStream *stream,
436 GCancellable *cancellable,
439 GBufferedOutputStream *bstream;
440 GBufferedOutputStreamPrivate *priv;
445 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
446 priv = bstream->priv;
448 n = priv->len - priv->pos;
450 if (priv->auto_grow && n < count)
452 new_size = MAX (priv->len * 2, priv->len + count);
453 g_buffered_output_stream_set_buffer_size (bstream, new_size);
457 res = flush_buffer (bstream, cancellable, error);
463 n = priv->len - priv->pos;
465 count = MIN (count, n);
466 memcpy (priv->buffer + priv->pos, buffer, count);
473 g_buffered_output_stream_flush (GOutputStream *stream,
474 GCancellable *cancellable,
477 GBufferedOutputStream *bstream;
478 GBufferedOutputStreamPrivate *priv;
479 GOutputStream *base_stream;
482 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
483 priv = bstream->priv;
484 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
486 res = flush_buffer (bstream, cancellable, error);
491 res = g_output_stream_flush (base_stream, cancellable, error);
497 g_buffered_output_stream_close (GOutputStream *stream,
498 GCancellable *cancellable,
501 GBufferedOutputStream *bstream;
502 GBufferedOutputStreamPrivate *priv;
503 GOutputStream *base_stream;
506 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
507 priv = bstream->priv;
508 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
510 res = flush_buffer (bstream, cancellable, error);
512 /* report the first error but still close the stream */
514 res = g_output_stream_close (base_stream, cancellable, error);
516 g_output_stream_close (base_stream, cancellable, NULL);
521 /* ************************** */
522 /* Async stuff implementation */
523 /* ************************** */
525 /* TODO: This should be using the base class async ops, not threads */
529 guint flush_stream : 1;
530 guint close_stream : 1;
535 free_flush_data (gpointer data)
537 g_slice_free (FlushData, data);
540 /* This function is used by all three (i.e.
541 * _write, _flush, _close) functions since
542 * all of them will need to flush the buffer
543 * and so closing and writing is just a special
544 * case of flushing + some addition stuff */
546 flush_buffer_thread (GSimpleAsyncResult *result,
548 GCancellable *cancellable)
550 GBufferedOutputStream *stream;
551 GOutputStream *base_stream;
554 GError *error = NULL;
556 stream = G_BUFFERED_OUTPUT_STREAM (object);
557 fdata = g_simple_async_result_get_op_res_gpointer (result);
558 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
560 res = flush_buffer (stream, cancellable, &error);
562 /* if flushing the buffer didn't work don't even bother
563 * to flush the stream but just report that error */
564 if (res && fdata->flush_stream)
565 res = g_output_stream_flush (base_stream, cancellable, &error);
567 if (fdata->close_stream)
570 /* if flushing the buffer or the stream returned
571 * an error report that first error but still try
572 * close the stream */
574 g_output_stream_close (base_stream, cancellable, NULL);
576 res = g_output_stream_close (base_stream, cancellable, &error);
581 g_simple_async_result_set_from_error (result, error);
582 g_error_free (error);
596 free_write_data (gpointer data)
598 g_slice_free (WriteData, data);
602 g_buffered_output_stream_write_async (GOutputStream *stream,
606 GCancellable *cancellable,
607 GAsyncReadyCallback callback,
610 GBufferedOutputStream *buffered_stream;
611 GBufferedOutputStreamPrivate *priv;
612 GSimpleAsyncResult *res;
615 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
616 priv = buffered_stream->priv;
618 wdata = g_slice_new (WriteData);
619 wdata->count = count;
620 wdata->buffer = buffer;
622 res = g_simple_async_result_new (G_OBJECT (stream),
625 g_buffered_output_stream_write_async);
627 g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
629 /* if we have space left directly call the
630 * callback (from idle) otherwise schedule a buffer
631 * flush in the thread. In both cases the actual
632 * copying of the data to the buffer will be done in
633 * the write_finish () func since that should
635 if (priv->len - priv->pos > 0)
637 g_simple_async_result_complete_in_idle (res);
641 wdata->fdata.flush_stream = FALSE;
642 wdata->fdata.close_stream = FALSE;
643 g_simple_async_result_run_in_thread (res,
647 g_object_unref (res);
652 g_buffered_output_stream_write_finish (GOutputStream *stream,
653 GAsyncResult *result,
656 GBufferedOutputStreamPrivate *priv;
657 GBufferedOutputStream *buffered_stream;
658 GSimpleAsyncResult *simple;
662 simple = G_SIMPLE_ASYNC_RESULT (result);
663 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
664 priv = buffered_stream->priv;
666 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
667 g_buffered_output_stream_write_async);
669 wdata = g_simple_async_result_get_op_res_gpointer (simple);
671 /* Now do the real copying of data to the buffer */
672 count = priv->len - priv->pos;
673 count = MIN (wdata->count, count);
675 memcpy (priv->buffer + priv->pos, wdata->buffer, count);
683 g_buffered_output_stream_flush_async (GOutputStream *stream,
685 GCancellable *cancellable,
686 GAsyncReadyCallback callback,
689 GSimpleAsyncResult *res;
692 fdata = g_slice_new (FlushData);
693 fdata->flush_stream = TRUE;
694 fdata->close_stream = FALSE;
696 res = g_simple_async_result_new (G_OBJECT (stream),
699 g_buffered_output_stream_flush_async);
701 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
703 g_simple_async_result_run_in_thread (res,
707 g_object_unref (res);
711 g_buffered_output_stream_flush_finish (GOutputStream *stream,
712 GAsyncResult *result,
715 GSimpleAsyncResult *simple;
717 simple = G_SIMPLE_ASYNC_RESULT (result);
719 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
720 g_buffered_output_stream_flush_async);
726 g_buffered_output_stream_close_async (GOutputStream *stream,
728 GCancellable *cancellable,
729 GAsyncReadyCallback callback,
732 GSimpleAsyncResult *res;
735 fdata = g_slice_new (FlushData);
736 fdata->close_stream = TRUE;
738 res = g_simple_async_result_new (G_OBJECT (stream),
741 g_buffered_output_stream_close_async);
743 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
745 g_simple_async_result_run_in_thread (res,
749 g_object_unref (res);
753 g_buffered_output_stream_close_finish (GOutputStream *stream,
754 GAsyncResult *result,
757 GSimpleAsyncResult *simple;
759 simple = G_SIMPLE_ASYNC_RESULT (result);
761 g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
762 g_buffered_output_stream_flush_async);
767 #define __G_BUFFERED_OUTPUT_STREAM_C__
768 #include "gioaliasdef.c"