1 /* GIO - GLib Input, Output and Streaming Library
3 * Copyright (C) 2006-2007 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
18 * Author: Christian Kellner <gicmo@gnome.org>
22 #include "gbufferedoutputstream.h"
23 #include "goutputstream.h"
24 #include "gseekable.h"
31 * SECTION:gbufferedoutputstream
32 * @short_description: Buffered Output Stream
34 * @see_also: #GFilterOutputStream, #GOutputStream
36 * Buffered output stream implements #GFilterOutputStream and provides
37 * for buffered writes.
39 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
41 * To create a buffered output stream, use g_buffered_output_stream_new(),
42 * or g_buffered_output_stream_new_sized() to specify the buffer's size
45 * To get the size of a buffer within a buffered input stream, use
46 * g_buffered_output_stream_get_buffer_size(). To change the size of a
47 * buffered output stream's buffer, use
48 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's
49 * size cannot be reduced below the size of the data within the buffer.
52 #define DEFAULT_BUFFER_SIZE 4096
54 struct _GBufferedOutputStreamPrivate {
67 static void g_buffered_output_stream_set_property (GObject *object,
72 static void g_buffered_output_stream_get_property (GObject *object,
76 static void g_buffered_output_stream_finalize (GObject *object);
79 static gssize g_buffered_output_stream_write (GOutputStream *stream,
82 GCancellable *cancellable,
84 static gboolean g_buffered_output_stream_flush (GOutputStream *stream,
85 GCancellable *cancellable,
87 static gboolean g_buffered_output_stream_close (GOutputStream *stream,
88 GCancellable *cancellable,
91 static void g_buffered_output_stream_flush_async (GOutputStream *stream,
93 GCancellable *cancellable,
94 GAsyncReadyCallback callback,
96 static gboolean g_buffered_output_stream_flush_finish (GOutputStream *stream,
99 static void g_buffered_output_stream_close_async (GOutputStream *stream,
101 GCancellable *cancellable,
102 GAsyncReadyCallback callback,
104 static gboolean g_buffered_output_stream_close_finish (GOutputStream *stream,
105 GAsyncResult *result,
108 static void g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface);
109 static goffset g_buffered_output_stream_tell (GSeekable *seekable);
110 static gboolean g_buffered_output_stream_can_seek (GSeekable *seekable);
111 static gboolean g_buffered_output_stream_seek (GSeekable *seekable,
114 GCancellable *cancellable,
116 static gboolean g_buffered_output_stream_can_truncate (GSeekable *seekable);
117 static gboolean g_buffered_output_stream_truncate (GSeekable *seekable,
119 GCancellable *cancellable,
122 G_DEFINE_TYPE_WITH_CODE (GBufferedOutputStream,
123 g_buffered_output_stream,
124 G_TYPE_FILTER_OUTPUT_STREAM,
125 G_ADD_PRIVATE (GBufferedOutputStream)
126 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
127 g_buffered_output_stream_seekable_iface_init))
131 g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
133 GObjectClass *object_class;
134 GOutputStreamClass *ostream_class;
136 object_class = G_OBJECT_CLASS (klass);
137 object_class->get_property = g_buffered_output_stream_get_property;
138 object_class->set_property = g_buffered_output_stream_set_property;
139 object_class->finalize = g_buffered_output_stream_finalize;
141 ostream_class = G_OUTPUT_STREAM_CLASS (klass);
142 ostream_class->write_fn = g_buffered_output_stream_write;
143 ostream_class->flush = g_buffered_output_stream_flush;
144 ostream_class->close_fn = g_buffered_output_stream_close;
145 ostream_class->flush_async = g_buffered_output_stream_flush_async;
146 ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
147 ostream_class->close_async = g_buffered_output_stream_close_async;
148 ostream_class->close_finish = g_buffered_output_stream_close_finish;
150 g_object_class_install_property (object_class,
152 g_param_spec_uint ("buffer-size",
154 P_("The size of the backend buffer"),
158 G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
159 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
161 g_object_class_install_property (object_class,
163 g_param_spec_boolean ("auto-grow",
165 P_("Whether the buffer should automatically grow"),
168 G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));
173 * g_buffered_output_stream_get_buffer_size:
174 * @stream: a #GBufferedOutputStream.
176 * Gets the size of the buffer in the @stream.
178 * Returns: the current size of the buffer.
181 g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
183 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);
185 return stream->priv->len;
189 * g_buffered_output_stream_set_buffer_size:
190 * @stream: a #GBufferedOutputStream.
193 * Sets the size of the internal buffer to @size.
196 g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
199 GBufferedOutputStreamPrivate *priv;
202 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
206 if (size == priv->len)
211 size = MAX (size, priv->pos);
213 buffer = g_malloc (size);
214 memcpy (buffer, priv->buffer, priv->pos);
215 g_free (priv->buffer);
216 priv->buffer = buffer;
222 priv->buffer = g_malloc (size);
227 g_object_notify (G_OBJECT (stream), "buffer-size");
231 * g_buffered_output_stream_get_auto_grow:
232 * @stream: a #GBufferedOutputStream.
234 * Checks if the buffer automatically grows as data is added.
236 * Returns: %TRUE if the @stream's buffer automatically grows,
240 g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
242 g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);
244 return stream->priv->auto_grow;
248 * g_buffered_output_stream_set_auto_grow:
249 * @stream: a #GBufferedOutputStream.
250 * @auto_grow: a #gboolean.
252 * Sets whether or not the @stream's buffer should automatically grow.
253 * If @auto_grow is true, then each write will just make the buffer
254 * larger, and you must manually flush the buffer to actually write out
255 * the data to the underlying stream.
258 g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
261 GBufferedOutputStreamPrivate *priv;
262 g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
264 auto_grow = auto_grow != FALSE;
265 if (priv->auto_grow != auto_grow)
267 priv->auto_grow = auto_grow;
268 g_object_notify (G_OBJECT (stream), "auto-grow");
273 g_buffered_output_stream_set_property (GObject *object,
278 GBufferedOutputStream *stream;
280 stream = G_BUFFERED_OUTPUT_STREAM (object);
285 g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
289 g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
293 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
300 g_buffered_output_stream_get_property (GObject *object,
305 GBufferedOutputStream *buffered_stream;
306 GBufferedOutputStreamPrivate *priv;
308 buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
309 priv = buffered_stream->priv;
314 g_value_set_uint (value, priv->len);
318 g_value_set_boolean (value, priv->auto_grow);
322 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
329 g_buffered_output_stream_finalize (GObject *object)
331 GBufferedOutputStream *stream;
332 GBufferedOutputStreamPrivate *priv;
334 stream = G_BUFFERED_OUTPUT_STREAM (object);
337 g_free (priv->buffer);
339 G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
343 g_buffered_output_stream_init (GBufferedOutputStream *stream)
345 stream->priv = g_buffered_output_stream_get_instance_private (stream);
349 g_buffered_output_stream_seekable_iface_init (GSeekableIface *iface)
351 iface->tell = g_buffered_output_stream_tell;
352 iface->can_seek = g_buffered_output_stream_can_seek;
353 iface->seek = g_buffered_output_stream_seek;
354 iface->can_truncate = g_buffered_output_stream_can_truncate;
355 iface->truncate_fn = g_buffered_output_stream_truncate;
359 * g_buffered_output_stream_new:
360 * @base_stream: a #GOutputStream.
362 * Creates a new buffered output stream for a base stream.
364 * Returns: a #GOutputStream for the given @base_stream.
367 g_buffered_output_stream_new (GOutputStream *base_stream)
369 GOutputStream *stream;
371 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
373 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
374 "base-stream", base_stream,
381 * g_buffered_output_stream_new_sized:
382 * @base_stream: a #GOutputStream.
385 * Creates a new buffered output stream with a given buffer size.
387 * Returns: a #GOutputStream with an internal buffer set to @size.
390 g_buffered_output_stream_new_sized (GOutputStream *base_stream,
393 GOutputStream *stream;
395 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);
397 stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
398 "base-stream", base_stream,
406 flush_buffer (GBufferedOutputStream *stream,
407 GCancellable *cancellable,
410 GBufferedOutputStreamPrivate *priv;
411 GOutputStream *base_stream;
418 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
420 g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);
422 res = g_output_stream_write_all (base_stream,
429 count = priv->pos - bytes_written;
432 memmove (priv->buffer, priv->buffer + bytes_written, count);
434 priv->pos -= bytes_written;
440 g_buffered_output_stream_write (GOutputStream *stream,
443 GCancellable *cancellable,
446 GBufferedOutputStream *bstream;
447 GBufferedOutputStreamPrivate *priv;
452 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
453 priv = bstream->priv;
455 n = priv->len - priv->pos;
457 if (priv->auto_grow && n < count)
459 new_size = MAX (priv->len * 2, priv->len + count);
460 g_buffered_output_stream_set_buffer_size (bstream, new_size);
464 res = flush_buffer (bstream, cancellable, error);
470 n = priv->len - priv->pos;
472 count = MIN (count, n);
473 memcpy (priv->buffer + priv->pos, buffer, count);
480 g_buffered_output_stream_flush (GOutputStream *stream,
481 GCancellable *cancellable,
484 GBufferedOutputStream *bstream;
485 GOutputStream *base_stream;
488 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
489 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
491 res = flush_buffer (bstream, cancellable, error);
496 res = g_output_stream_flush (base_stream, cancellable, error);
502 g_buffered_output_stream_close (GOutputStream *stream,
503 GCancellable *cancellable,
506 GBufferedOutputStream *bstream;
507 GOutputStream *base_stream;
510 bstream = G_BUFFERED_OUTPUT_STREAM (stream);
511 base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;
512 res = flush_buffer (bstream, cancellable, error);
514 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
516 /* report the first error but still close the stream */
518 res = g_output_stream_close (base_stream, cancellable, error);
520 g_output_stream_close (base_stream, cancellable, NULL);
527 g_buffered_output_stream_tell (GSeekable *seekable)
529 GBufferedOutputStream *bstream;
530 GBufferedOutputStreamPrivate *priv;
531 GOutputStream *base_stream;
532 GSeekable *base_stream_seekable;
535 bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
536 priv = bstream->priv;
538 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
539 if (!G_IS_SEEKABLE (base_stream))
542 base_stream_seekable = G_SEEKABLE (base_stream);
544 base_offset = g_seekable_tell (base_stream_seekable);
545 return base_offset + priv->pos;
549 g_buffered_output_stream_can_seek (GSeekable *seekable)
551 GOutputStream *base_stream;
553 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
554 return G_IS_SEEKABLE (base_stream) && g_seekable_can_seek (G_SEEKABLE (base_stream));
558 g_buffered_output_stream_seek (GSeekable *seekable,
561 GCancellable *cancellable,
564 GBufferedOutputStream *bstream;
565 GOutputStream *base_stream;
566 GSeekable *base_stream_seekable;
569 bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
571 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
572 if (!G_IS_SEEKABLE (base_stream))
574 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
575 _("Seek not supported on base stream"));
579 base_stream_seekable = G_SEEKABLE (base_stream);
580 flushed = flush_buffer (bstream, cancellable, error);
584 return g_seekable_seek (base_stream_seekable, offset, type, cancellable, error);
588 g_buffered_output_stream_can_truncate (GSeekable *seekable)
590 GOutputStream *base_stream;
592 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
593 return G_IS_SEEKABLE (base_stream) && g_seekable_can_truncate (G_SEEKABLE (base_stream));
597 g_buffered_output_stream_truncate (GSeekable *seekable,
599 GCancellable *cancellable,
602 GBufferedOutputStream *bstream;
603 GOutputStream *base_stream;
604 GSeekable *base_stream_seekable;
607 bstream = G_BUFFERED_OUTPUT_STREAM (seekable);
608 base_stream = G_FILTER_OUTPUT_STREAM (seekable)->base_stream;
609 if (!G_IS_SEEKABLE (base_stream))
611 g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
612 _("Truncate not supported on base stream"));
616 base_stream_seekable = G_SEEKABLE (base_stream);
618 flushed = flush_buffer (bstream, cancellable, error);
621 return g_seekable_truncate (base_stream_seekable, offset, cancellable, error);
624 /* ************************** */
625 /* Async stuff implementation */
626 /* ************************** */
628 /* TODO: This should be using the base class async ops, not threads */
632 guint flush_stream : 1;
633 guint close_stream : 1;
638 free_flush_data (gpointer data)
640 g_slice_free (FlushData, data);
643 /* This function is used by all three (i.e.
644 * _write, _flush, _close) functions since
645 * all of them will need to flush the buffer
646 * and so closing and writing is just a special
647 * case of flushing + some addition stuff */
649 flush_buffer_thread (GTask *task,
652 GCancellable *cancellable)
654 GBufferedOutputStream *stream;
655 GOutputStream *base_stream;
658 GError *error = NULL;
660 stream = G_BUFFERED_OUTPUT_STREAM (object);
662 base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
664 res = flush_buffer (stream, cancellable, &error);
666 /* if flushing the buffer didn't work don't even bother
667 * to flush the stream but just report that error */
668 if (res && fdata->flush_stream)
669 res = g_output_stream_flush (base_stream, cancellable, &error);
671 if (fdata->close_stream)
674 /* if flushing the buffer or the stream returned
675 * an error report that first error but still try
676 * close the stream */
677 if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
680 g_output_stream_close (base_stream, cancellable, NULL);
682 res = g_output_stream_close (base_stream, cancellable, &error);
687 g_task_return_error (task, error);
689 g_task_return_boolean (task, TRUE);
693 g_buffered_output_stream_flush_async (GOutputStream *stream,
695 GCancellable *cancellable,
696 GAsyncReadyCallback callback,
702 fdata = g_slice_new (FlushData);
703 fdata->flush_stream = TRUE;
704 fdata->close_stream = FALSE;
706 task = g_task_new (stream, cancellable, callback, data);
707 g_task_set_task_data (task, fdata, free_flush_data);
708 g_task_set_priority (task, io_priority);
710 g_task_run_in_thread (task, flush_buffer_thread);
711 g_object_unref (task);
715 g_buffered_output_stream_flush_finish (GOutputStream *stream,
716 GAsyncResult *result,
719 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
721 return g_task_propagate_boolean (G_TASK (result), error);
725 g_buffered_output_stream_close_async (GOutputStream *stream,
727 GCancellable *cancellable,
728 GAsyncReadyCallback callback,
734 fdata = g_slice_new (FlushData);
735 fdata->close_stream = TRUE;
737 task = g_task_new (stream, cancellable, callback, data);
738 g_task_set_task_data (task, fdata, free_flush_data);
739 g_task_set_priority (task, io_priority);
741 g_task_run_in_thread (task, flush_buffer_thread);
742 g_object_unref (task);
746 g_buffered_output_stream_close_finish (GOutputStream *stream,
747 GAsyncResult *result,
750 g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
752 return g_task_propagate_boolean (G_TASK (result), error);