1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
3 * Copyright (C) 2012 Igalia, S.L.
10 #include <glib/gi18n-lib.h>
11 #include "soup-cache-input-stream.h"
12 #include "soup-message-body.h"
14 static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
16 G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
17 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
18 soup_cache_input_stream_pollable_init))
35 static guint signals[LAST_SIGNAL] = { 0 };
37 struct _SoupCacheInputStreamPrivate
39 GOutputStream *output_stream;
40 GCancellable *cancellable;
43 gboolean read_finished;
44 SoupBuffer *current_writing_buffer;
48 static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
51 notify_and_clear (SoupCacheInputStream *istream, GError *error)
53 SoupCacheInputStreamPrivate *priv = istream->priv;
55 g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error);
57 g_clear_object (&priv->cancellable);
58 g_clear_object (&priv->output_stream);
62 try_write_next_buffer (SoupCacheInputStream *istream)
64 SoupCacheInputStreamPrivate *priv = istream->priv;
66 if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
67 soup_cache_input_stream_write_next_buffer (istream);
68 else if (priv->read_finished)
69 notify_and_clear (istream, NULL);
70 else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
72 g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
73 _("Network stream unexpectedly closed"));
74 notify_and_clear (istream, error);
79 file_replaced_cb (GObject *source,
83 SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
84 SoupCacheInputStreamPrivate *priv = istream->priv;
87 priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
90 notify_and_clear (istream, error);
92 try_write_next_buffer (istream);
94 g_object_unref (istream);
98 soup_cache_input_stream_init (SoupCacheInputStream *self)
100 SoupCacheInputStreamPrivate *priv =
101 G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM,
102 SoupCacheInputStreamPrivate);
104 priv->buffer_queue = g_queue_new ();
109 soup_cache_input_stream_get_property (GObject *object,
110 guint property_id, GValue *value, GParamSpec *pspec)
112 SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
113 SoupCacheInputStreamPrivate *priv = self->priv;
115 switch (property_id) {
116 case PROP_OUTPUT_STREAM:
117 g_value_set_object (value, priv->output_stream);
120 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
126 soup_cache_input_stream_set_property (GObject *object,
127 guint property_id, const GValue *value, GParamSpec *pspec)
129 SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
130 SoupCacheInputStreamPrivate *priv = self->priv;
132 switch (property_id) {
133 case PROP_OUTPUT_STREAM:
134 priv->output_stream = g_value_dup_object (value);
137 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
143 soup_cache_input_stream_finalize (GObject *object)
145 SoupCacheInputStream *self = (SoupCacheInputStream *)object;
146 SoupCacheInputStreamPrivate *priv = self->priv;
148 g_clear_object (&priv->cancellable);
149 g_clear_object (&priv->output_stream);
150 g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
151 g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
153 G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
157 write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
159 GOutputStream *ostream = G_OUTPUT_STREAM (source);
160 SoupCacheInputStreamPrivate *priv = istream->priv;
163 GError *error = NULL;
165 write_size = g_output_stream_write_finish (ostream, result, &error);
167 notify_and_clear (istream, error);
168 g_object_unref (istream);
172 /* Check that we have written everything */
173 pending = priv->current_writing_buffer->length - write_size;
175 SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
176 write_size, pending);
177 g_queue_push_head (priv->buffer_queue, subbuffer);
180 priv->bytes_written += write_size;
181 g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
183 try_write_next_buffer (istream);
184 g_object_unref (istream);
188 soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
190 SoupCacheInputStreamPrivate *priv = istream->priv;
191 SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
194 g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
196 g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
197 priv->current_writing_buffer = buffer;
199 if (priv->buffer_queue->length > 10)
200 priority = G_PRIORITY_DEFAULT;
202 priority = G_PRIORITY_LOW;
204 g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
205 priority, priv->cancellable,
206 (GAsyncReadyCallback) write_ready_cb,
207 g_object_ref (istream));
211 read_internal (GInputStream *stream,
215 GCancellable *cancellable,
218 SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
219 SoupCacheInputStreamPrivate *priv = istream->priv;
220 GInputStream *base_stream;
223 base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
224 nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
227 if (G_UNLIKELY (nread == -1 || priv->read_finished))
231 priv->read_finished = TRUE;
233 if (priv->current_writing_buffer == NULL && priv->output_stream)
234 notify_and_clear (istream, NULL);
236 SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
237 g_queue_push_tail (priv->buffer_queue, soup_buffer);
239 if (priv->current_writing_buffer == NULL && priv->output_stream)
240 soup_cache_input_stream_write_next_buffer (istream);
247 soup_cache_input_stream_read_fn (GInputStream *stream,
250 GCancellable *cancellable,
253 return read_internal (stream, buffer, count, TRUE,
258 soup_cache_input_stream_read_nonblocking (GPollableInputStream *stream,
263 return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
268 soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
269 gpointer interface_data)
271 pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
275 soup_cache_input_stream_close_fn (GInputStream *stream,
276 GCancellable *cancellable,
279 SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
280 SoupCacheInputStreamPrivate *priv = istream->priv;
282 if (!priv->read_finished) {
283 if (priv->output_stream) {
284 /* Cancel any pending write operation or return an error if none. */
285 if (g_output_stream_has_pending (priv->output_stream))
286 g_cancellable_cancel (priv->cancellable);
288 GError *error = NULL;
289 g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
290 _("Failed to completely cache the resource"));
291 notify_and_clear (istream, error);
293 } else if (priv->cancellable)
294 /* The file_replace_async() hasn't finished yet */
295 g_cancellable_cancel (priv->cancellable);
298 return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error);
302 soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
304 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
305 GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
307 g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate));
309 gobject_class->get_property = soup_cache_input_stream_get_property;
310 gobject_class->set_property = soup_cache_input_stream_set_property;
311 gobject_class->finalize = soup_cache_input_stream_finalize;
313 istream_class->read_fn = soup_cache_input_stream_read_fn;
314 istream_class->close_fn = soup_cache_input_stream_close_fn;
316 g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
317 g_param_spec_object ("output-stream", "Output stream",
318 "the output stream where to write.",
319 G_TYPE_OUTPUT_STREAM,
321 G_PARAM_CONSTRUCT_ONLY |
322 G_PARAM_STATIC_STRINGS));
324 signals[CACHING_FINISHED] =
325 g_signal_new ("caching-finished",
326 G_OBJECT_CLASS_TYPE (gobject_class),
328 G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished),
332 G_TYPE_INT, G_TYPE_ERROR);
336 soup_cache_input_stream_new (GInputStream *base_stream,
339 SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
340 "base-stream", base_stream,
341 "close-base-stream", FALSE,
344 istream->priv->cancellable = g_cancellable_new ();
345 g_file_replace_async (file, NULL, FALSE,
346 G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
347 G_PRIORITY_DEFAULT, istream->priv->cancellable,
348 file_replaced_cb, g_object_ref (istream));
350 return (GInputStream *) istream;