1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: Christian Kellner <gicmo@gnome.org>
24 #include "gbufferedoutputstream.h"
25 #include "goutputstream.h"
26 #include "gsimpleasyncresult.h"
31 * SECTION:gbufferedoutputstream
32 * @short_description: Buffered Output Stream
33 * @see_also: #GFilterOutputStream, #GOutputStream
35 * Buffered output stream implements #GFilterOutputStream and provides
36 * for buffered writes.
38 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
40 * To create a buffered output stream, use g_buffered_output_stream_new(), or
41 * g_buffered_output_stream_new_sized() to specify the buffer's size at construction.
43 * To get the size of a buffer within a buffered input stream, use
44 * g_buffered_output_stream_get_buffer_size(). To change the size of a
45 * buffered output stream's buffer, use g_buffered_output_stream_set_buffer_size().
46 * Note: the buffer's size cannot be reduced below the size of the data within the
53 #define DEFAULT_BUFFER_SIZE 4096
55 struct _GBufferedOutputStreamPrivate {
67 static void g_buffered_output_stream_set_property (GObject *object,
72 static void g_buffered_output_stream_get_property (GObject *object,
76 static void g_buffered_output_stream_finalize (GObject *object);
79 static gssize g_buffered_output_stream_write (GOutputStream *stream,
82 GCancellable *cancellable,
84 static gboolean g_buffered_output_stream_flush (GOutputStream *stream,
85 GCancellable *cancellable,
87 static gboolean g_buffered_output_stream_close (GOutputStream *stream,
88 GCancellable *cancellable,
91 static void g_buffered_output_stream_write_async (GOutputStream *stream,
95 GCancellable *cancellable,
96 GAsyncReadyCallback callback,
98 static gssize g_buffered_output_stream_write_finish (GOutputStream *stream,
101 static void g_buffered_output_stream_flush_async (GOutputStream *stream,
103 GCancellable *cancellable,
104 GAsyncReadyCallback callback,
106 static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream,
107 GAsyncResult *result,
109 static void g_buffered_output_stream_close_async (GOutputStream *stream,
111 GCancellable *cancellable,
112 GAsyncReadyCallback callback,
114 static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream,
115 GAsyncResult *result,
118 G_DEFINE_TYPE (GBufferedOutputStream,
119 g_buffered_output_stream,
120 G_TYPE_FILTER_OUTPUT_STREAM)
124 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
126 GObjectClass *object_class;
127 GOutputStreamClass *ostream_class;
129 g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));
131 object_class = G_OBJECT_CLASS (klass);
132 object_class->get_property = g_buffered_output_stream_get_property;
133 object_class->set_property = g_buffered_output_stream_set_property;
134 object_class->finalize = g_buffered_output_stream_finalize;
136 ostream_class = G_OUTPUT_STREAM_CLASS (klass);
137 ostream_class->write = g_buffered_output_stream_write;
138 ostream_class->flush = g_buffered_output_stream_flush;
139 ostream_class->close = g_buffered_output_stream_close;
140 ostream_class->write_async = g_buffered_output_stream_write_async;
141 ostream_class->write_finish = g_buffered_output_stream_write_finish;
142 ostream_class->flush_async = g_buffered_output_stream_flush_async;
143 ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
144 ostream_class->close_async = g_buffered_output_stream_close_async;
145 ostream_class->close_finish = g_buffered_output_stream_close_finish;
147 g_object_class_install_property (object_class,
149 g_param_spec_uint ("buffer-size",
151 P_("The size of the backend buffer"),
155 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
156 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
161 * g_buffered_output_stream_get_buffer_size:
162 * @stream: a #GBufferedOutputStream.
164 * Gets the size of the buffer in the @stream.
166 * Returns: the current size of the buffer.
169 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
171 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
173 return stream->priv->len;
177 * g_buffered_output_stream_set_buffer_size:
178 * @stream: a #GBufferedOutputStream.
181 * Sets the size of the internal buffer to @size.
184 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
187 GBufferedOutputStreamPrivate *priv;
190 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
196 size = MAX (size, priv->pos);
198 buffer = g_malloc (size);
199 memcpy (buffer, priv->buffer, priv->pos);
200 g_free (priv->buffer);
201 priv->buffer = buffer;
207 priv->buffer = g_malloc (size);
214 * g_buffered_output_stream_get_auto_grow:
215 * @stream: a #GBufferedOutputStream.
217 * Checks if the buffer automatically grows as data is added.
219 * Returns: %TRUE if the @stream's buffer automatically grows,
223 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
225 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
227 return stream->priv->auto_grow;
231 * g_buffered_output_stream_set_auto_grow:
232 * @stream: a #GBufferedOutputStream.
233 * @auto_grow: a #gboolean.
235 * Sets whether or not the @stream's buffer should automatically grow.
238 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
241 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
243 stream->priv->auto_grow = auto_grow;
247 g_buffered_output_stream_set_property (GObject *object,
252 GBufferedOutputStream *buffered_stream;
253 GBufferedOutputStreamPrivate *priv;
255 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
256 priv = buffered_stream->priv;
262 g_buffered_output_stream_set_buffer_size (buffered_stream, g_value_get_uint (value));
266 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
273 g_buffered_output_stream_get_property (GObject *object,
278 GBufferedOutputStream *buffered_stream;
279 GBufferedOutputStreamPrivate *priv;
281 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
282 priv = buffered_stream->priv;
288 g_value_set_uint (value, priv->len);
292 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
299 g_buffered_output_stream_finalize (GObject *object)
301 GBufferedOutputStream *stream;
302 GBufferedOutputStreamPrivate *priv;
304 stream = G_BUFFERED_OUTPUT_STREAM (object);
307 g_free (priv->buffer);
309 if (G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize)
310 (*G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize) (object);
314 g_buffered_output_stream_init (GBufferedOutputStream *stream)
316 stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
317 G_TYPE_BUFFERED_OUTPUT_STREAM,
318 GBufferedOutputStreamPrivate);
323 * g_buffered_output_stream_new:
324 * @base_stream: a #GOutputStream.
326 * Creates a new buffered output stream for a base stream.
328 * Returns: a #GOutputStream for the given @base_stream.
331 g_buffered_output_stream_new (GOutputStream *base_stream)
333 GOutputStream *stream;
335 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
337 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
338 "base-stream", base_stream,
345 * g_buffered_output_stream_new_sized:
346 * @base_stream: a #GOutputStream.
349 * Creates a new buffered output stream with a given buffer size.
351 * Returns: a #GOutputStream with an internal buffer set to @size.
354 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
357 GOutputStream *stream;
359 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
361 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
362 "base-stream", base_stream,
370 flush_buffer (GBufferedOutputStream *stream,
371 GCancellable *cancellable,
374 GBufferedOutputStreamPrivate *priv;
375 GOutputStream *base_stream;
382 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
384 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
386 res = g_output_stream_write_all (base_stream,
393 count = priv->pos - bytes_written;
396 g_memmove (priv->buffer, priv->buffer + bytes_written, count);
398 priv->pos -= bytes_written;
404 g_buffered_output_stream_write (GOutputStream *stream,
407 GCancellable *cancellable,
410 GBufferedOutputStream *bstream;
411 GBufferedOutputStreamPrivate *priv;
416 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
417 priv = bstream->priv;
419 n = priv->len - priv->pos;
421 if (priv->auto_grow && n < count)
423 new_size = MAX (priv->len * 2, priv->len + count);
424 g_buffered_output_stream_set_buffer_size (bstream, new_size);
428 res = flush_buffer (bstream, cancellable, error);
434 n = priv->len - priv->pos;
436 count = MIN (count, n);
437 memcpy (priv->buffer + priv->pos, buffer, count);
444 g_buffered_output_stream_flush (GOutputStream *stream,
445 GCancellable *cancellable,
448 GBufferedOutputStream *bstream;
449 GBufferedOutputStreamPrivate *priv;
450 GOutputStream *base_stream;
453 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
454 priv = bstream->priv;
455 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
457 res = flush_buffer (bstream, cancellable, error);
463 res = g_output_stream_flush (base_stream,
470 g_buffered_output_stream_close (GOutputStream *stream,
471 GCancellable *cancellable,
474 GBufferedOutputStream *bstream;
475 GBufferedOutputStreamPrivate *priv;
476 GOutputStream *base_stream;
479 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
480 priv = bstream->priv;
481 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
483 res = flush_buffer (bstream, cancellable, error);
485 /* report the first error but still close the stream */
488 res = g_output_stream_close (base_stream,
494 g_output_stream_close (base_stream,
502 /* ************************** */
503 /* Async stuff implementation */
504 /* ************************** */
506 /* TODO: This should be using the base class async ops, not threads */
510 guint flush_stream : 1;
511 guint close_stream : 1;
516 free_flush_data (gpointer data)
518 g_slice_free (FlushData, data);
521 /* This function is used by all three (i.e.
522 * _write, _flush, _close) functions since
523 * all of them will need to flush the buffer
524 * and so closing and writing is just a special
525 * case of flushing + some addition stuff */
527 flush_buffer_thread (GSimpleAsyncResult *result,
529 GCancellable *cancellable)
531 GBufferedOutputStream *stream;
532 GOutputStream *base_stream;
535 GError *error = NULL;
537 stream = G_BUFFERED_OUTPUT_STREAM (object);
538 fdata = g_simple_async_result_get_op_res_gpointer (result);
539 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
541 res = flush_buffer (stream, cancellable, &error);
543 /* if flushing the buffer didn't work don't even bother
544 * to flush the stream but just report that error */
545 if (res && fdata->flush_stream)
547 res = g_output_stream_flush (base_stream,
552 if (fdata->close_stream)
555 /* if flushing the buffer or the stream returned
556 * an error report that first error but still try
557 * close the stream */
560 g_output_stream_close (base_stream,
566 res = g_output_stream_close (base_stream,
575 g_simple_async_result_set_from_error (result, error);
576 g_error_free (error);
590 free_write_data (gpointer data)
592 g_slice_free (WriteData, data);
596 g_buffered_output_stream_write_async (GOutputStream *stream,
600 GCancellable *cancellable,
601 GAsyncReadyCallback callback,
604 GBufferedOutputStream *buffered_stream;
605 GBufferedOutputStreamPrivate *priv;
606 GSimpleAsyncResult *res;
609 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
610 priv = buffered_stream->priv;
612 wdata = g_slice_new (WriteData);
613 wdata->count = count;
614 wdata->buffer = buffer;
616 res = g_simple_async_result_new (G_OBJECT (stream),
619 g_buffered_output_stream_write_async);
621 g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
623 /* if we have space left directly call the
624 * callback (from idle) otherwise schedule a buffer
625 * flush in the thread. In both cases the actual
626 * copying of the data to the buffer will be done in
627 * the write_finish () func since that should
629 if (priv->len - priv->pos > 0)
631 g_simple_async_result_complete_in_idle (res);
635 wdata->fdata.flush_stream = FALSE;
636 wdata->fdata.close_stream = FALSE;
637 g_simple_async_result_run_in_thread (res,
641 g_object_unref (res);
646 g_buffered_output_stream_write_finish (GOutputStream *stream,
647 GAsyncResult *result,
650 GBufferedOutputStreamPrivate *priv;
651 GBufferedOutputStream *buffered_stream;
652 GSimpleAsyncResult *simple;
656 simple = G_SIMPLE_ASYNC_RESULT (result);
657 buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
658 priv = buffered_stream->priv;
660 g_assert (g_simple_async_result_get_source_tag (simple) ==
661 g_buffered_output_stream_write_async);
663 wdata = g_simple_async_result_get_op_res_gpointer (simple);
665 /* Now do the real copying of data to the buffer */
666 count = priv->len - priv->pos;
667 count = MIN (wdata->count, count);
669 memcpy (priv->buffer + priv->pos, wdata->buffer, count);
677 g_buffered_output_stream_flush_async (GOutputStream *stream,
679 GCancellable *cancellable,
680 GAsyncReadyCallback callback,
683 GSimpleAsyncResult *res;
686 fdata = g_slice_new (FlushData);
687 fdata->flush_stream = TRUE;
688 fdata->close_stream = FALSE;
690 res = g_simple_async_result_new (G_OBJECT (stream),
693 g_buffered_output_stream_flush_async);
695 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
697 g_simple_async_result_run_in_thread (res,
701 g_object_unref (res);
705 g_buffered_output_stream_flush_finish (GOutputStream *stream,
706 GAsyncResult *result,
709 GSimpleAsyncResult *simple;
711 simple = G_SIMPLE_ASYNC_RESULT (result);
713 g_assert (g_simple_async_result_get_source_tag (simple) ==
714 g_buffered_output_stream_flush_async);
720 g_buffered_output_stream_close_async (GOutputStream *stream,
722 GCancellable *cancellable,
723 GAsyncReadyCallback callback,
726 GSimpleAsyncResult *res;
729 fdata = g_slice_new (FlushData);
730 fdata->close_stream = TRUE;
732 res = g_simple_async_result_new (G_OBJECT (stream),
735 g_buffered_output_stream_close_async);
737 g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);
739 g_simple_async_result_run_in_thread (res,
743 g_object_unref (res);
747 g_buffered_output_stream_close_finish (GOutputStream *stream,
748 GAsyncResult *result,
751 GSimpleAsyncResult *simple;
753 simple = G_SIMPLE_ASYNC_RESULT (result);
755 g_assert (g_simple_async_result_get_source_tag (simple) ==
756 g_buffered_output_stream_flush_async);
761 /* vim: ts=2 sw=2 et */