SoupCacheInputStream: new input stream filter that writes to the cache
authorSergio Villar Senin <svillar@igalia.com>
Mon, 30 Jul 2012 10:40:32 +0000 (12:40 +0200)
committerSergio Villar Senin <svillar@igalia.com>
Tue, 18 Dec 2012 15:06:06 +0000 (16:06 +0100)
The SoupCacheInputStream will be added to the stream stack as any other
GPollableInputStream. It will transparently read data from the underlying
stream and will pass it unmodified to the upper level.

Apart from that the stream writes everything it reads to a local file. Once
the caching finishes a callback will be called. Caching may be cancelled at
any point by providing a GCancellable.

https://bugzilla.gnome.org/show_bug.cgi?id=682112

libsoup/Makefile.am
libsoup/soup-cache-input-stream.c [new file with mode: 0644]
libsoup/soup-cache-input-stream.h [new file with mode: 0644]
po/POTFILES.in

index 87629ed..59553f0 100644 (file)
@@ -114,6 +114,8 @@ libsoup_2_4_la_SOURCES =            \
        soup-body-output-stream.h       \
        soup-body-output-stream.c       \
        soup-cache.c                    \
+       soup-cache-input-stream.h       \
+       soup-cache-input-stream.c       \
        soup-cache-private.h            \
        soup-client-input-stream.h      \
        soup-client-input-stream.c      \
diff --git a/libsoup/soup-cache-input-stream.c b/libsoup/soup-cache-input-stream.c
new file mode 100644 (file)
index 0000000..a44652a
--- /dev/null
@@ -0,0 +1,333 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2012 Igalia, S.L.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <glib/gi18n-lib.h>
+#include "soup-cache-input-stream.h"
+#include "soup-message-body.h"
+
+static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+                        G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                                               soup_cache_input_stream_pollable_init))
+
+/* properties */
+enum {
+       PROP_0,
+
+       PROP_OUTPUT_STREAM,
+
+       LAST_PROP
+};
+
+struct _SoupCacheInputStreamPrivate
+{
+       GOutputStream *output_stream;
+       gsize bytes_written;
+
+       gboolean read_finished;
+       SoupBuffer *current_writing_buffer;
+       GQueue *buffer_queue;
+
+       GTask *task;
+};
+
+static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
+
+static inline void
+notify_and_clear (SoupCacheInputStream *istream, GError *error)
+{
+       SoupCacheInputStreamPrivate *priv = istream->priv;
+
+       if (error)
+               g_task_return_error (priv->task, error);
+       else
+               g_task_return_int (priv->task, priv->bytes_written);
+
+       g_clear_object (&priv->output_stream);
+       g_clear_object (&priv->task);
+}
+
+gsize
+soup_cache_input_stream_cache_finish (SoupCacheInputStream  *istream,
+                                     GAsyncResult          *result,
+                                     GError               **error)
+{
+       return g_task_propagate_int (G_TASK (result), error);
+}
+
+static inline void
+try_write_next_buffer (SoupCacheInputStream *istream)
+{
+       SoupCacheInputStreamPrivate *priv = istream->priv;
+
+       if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
+               soup_cache_input_stream_write_next_buffer (istream);
+       else if (priv->read_finished)
+               notify_and_clear (istream, NULL);
+       else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
+               GError *error = NULL;
+               g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                                    _("Network stream unexpectedly closed"));
+               notify_and_clear (istream, error);
+       }
+}
+
+static void
+file_replaced_cb (GObject      *source,
+                 GAsyncResult *res,
+                 gpointer      user_data)
+{
+       SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
+       SoupCacheInputStreamPrivate *priv = istream->priv;
+       GError *error = NULL;
+
+       priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
+
+       if (error)
+               g_task_return_error (priv->task, error);
+       else
+               try_write_next_buffer (istream);
+}
+
+void
+soup_cache_input_stream_cache (SoupCacheInputStream  *istream,
+                              GFile                 *file,
+                              GCancellable          *cancellable,
+                              GAsyncReadyCallback    callback,
+                              gpointer               user_data)
+{
+       SoupCacheInputStreamPrivate *priv = istream->priv;
+
+       priv->task = g_task_new (istream, cancellable, callback, user_data);
+
+       g_file_replace_async (file, NULL, FALSE,
+                             G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
+                             G_PRIORITY_LOW, cancellable, file_replaced_cb, istream);
+}
+
+static void
+soup_cache_input_stream_init (SoupCacheInputStream *self)
+{
+       SoupCacheInputStreamPrivate *priv =
+               G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM,
+                                            SoupCacheInputStreamPrivate);
+
+       priv->buffer_queue = g_queue_new ();
+       self->priv = priv;
+}
+
+static void
+soup_cache_input_stream_get_property (GObject *object,
+                                     guint property_id, GValue *value, GParamSpec *pspec)
+{
+       SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+       SoupCacheInputStreamPrivate *priv = self->priv;
+
+       switch (property_id) {
+       case PROP_OUTPUT_STREAM:
+               g_value_set_object (value, priv->output_stream);
+               break;
+       default:
+               G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+               break;
+       }
+}
+
+static void
+soup_cache_input_stream_set_property (GObject *object,
+                                     guint property_id, const GValue *value, GParamSpec *pspec)
+{
+       SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+       SoupCacheInputStreamPrivate *priv = self->priv;
+
+       switch (property_id) {
+       case PROP_OUTPUT_STREAM:
+               priv->output_stream = g_value_dup_object (value);
+               break;
+       default:
+               G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+               break;
+       }
+}
+
+static void
+soup_cache_input_stream_dispose (GObject *object)
+{
+       SoupCacheInputStreamPrivate *priv = SOUP_CACHE_INPUT_STREAM (object)->priv;
+
+       g_clear_object (&priv->output_stream);
+       g_clear_object (&priv->task);
+
+       G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->dispose (object);
+}
+
+static void
+soup_cache_input_stream_finalize (GObject *object)
+{
+       SoupCacheInputStream *self = (SoupCacheInputStream *)object;
+       SoupCacheInputStreamPrivate *priv = self->priv;
+
+       g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+       g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
+
+       G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
+}
+
+static void
+write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
+{
+       GOutputStream *ostream = G_OUTPUT_STREAM (source);
+       SoupCacheInputStreamPrivate *priv = istream->priv;
+       gssize write_size;
+       gsize pending;
+       GError *error = NULL;
+
+       write_size = g_output_stream_write_finish (ostream, result, &error);
+       if (error) {
+               notify_and_clear (istream, error);
+               g_object_unref (istream);
+               return;
+       }
+
+       /* Check that we have written everything */
+       pending = priv->current_writing_buffer->length - write_size;
+       if (pending) {
+               SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
+                                                                  write_size, pending);
+               g_queue_push_head (priv->buffer_queue, subbuffer);
+       }
+
+       priv->bytes_written += write_size;
+       g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+
+       try_write_next_buffer (istream);
+       g_object_unref (istream);
+}
+
+static void
+soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
+{
+       SoupCacheInputStreamPrivate *priv = istream->priv;
+       SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
+       int priority;
+
+       g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
+       g_assert (priv->task);
+
+       g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+       priv->current_writing_buffer = buffer;
+
+       if (priv->buffer_queue->length > 10)
+               priority = G_PRIORITY_DEFAULT;
+       else
+               priority = G_PRIORITY_LOW;
+
+       g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
+                                    priority, g_task_get_cancellable (priv->task),
+                                    (GAsyncReadyCallback) write_ready_cb,
+                                    g_object_ref (istream));
+}
+
+static gssize
+read_internal (GInputStream  *stream,
+              void          *buffer,
+              gsize          count,
+              gboolean       blocking,
+              GCancellable  *cancellable,
+              GError       **error)
+{
+       SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
+       SoupCacheInputStreamPrivate *priv = istream->priv;
+       GInputStream *base_stream;
+       gssize nread;
+
+       base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
+       nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
+                                       cancellable, error);
+
+       if (G_UNLIKELY (nread == -1 || priv->read_finished || !priv->task))
+               return nread;
+
+       if (nread == 0) {
+               priv->read_finished = TRUE;
+
+               if (priv->current_writing_buffer == NULL && priv->output_stream)
+                       notify_and_clear (istream, NULL);
+       } else {
+               SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
+               g_queue_push_tail (priv->buffer_queue, soup_buffer);
+
+               if (priv->current_writing_buffer == NULL && priv->output_stream)
+                       soup_cache_input_stream_write_next_buffer (istream);
+       }
+
+       return nread;
+}
+
+static gssize
+soup_cache_input_stream_read_fn (GInputStream  *stream,
+                                void          *buffer,
+                                gsize          count,
+                                GCancellable  *cancellable,
+                                GError       **error)
+{
+       return read_internal (stream, buffer, count, TRUE,
+                             cancellable, error);
+}
+
+static gssize
+soup_cache_input_stream_read_nonblocking (GPollableInputStream  *stream,
+                                         void                  *buffer,
+                                         gsize                  count,
+                                         GError               **error)
+{
+       return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
+                             NULL, error);
+}
+
+static void
+soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+                                      gpointer interface_data)
+{
+       pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
+}
+
+static void
+soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
+{
+       GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+       GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+       g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate));
+
+       gobject_class->get_property = soup_cache_input_stream_get_property;
+       gobject_class->set_property = soup_cache_input_stream_set_property;
+       gobject_class->dispose = soup_cache_input_stream_dispose;
+       gobject_class->finalize = soup_cache_input_stream_finalize;
+
+       istream_class->read_fn = soup_cache_input_stream_read_fn;
+
+       g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
+                                        g_param_spec_object ("output-stream", "Output stream",
+                                                             "the output stream where to write.",
+                                                             G_TYPE_OUTPUT_STREAM,
+                                                             G_PARAM_READWRITE |
+                                                             G_PARAM_CONSTRUCT_ONLY |
+                                                             G_PARAM_STATIC_STRINGS));
+}
+
+GInputStream *
+soup_cache_input_stream_new (GInputStream *base_stream)
+{
+       return g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
+                            "base-stream", base_stream,
+                            "close-base-stream", FALSE,
+                            NULL);
+}
diff --git a/libsoup/soup-cache-input-stream.h b/libsoup/soup-cache-input-stream.h
new file mode 100644 (file)
index 0000000..c999d10
--- /dev/null
@@ -0,0 +1,52 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-cache-input-stream.h - Header for SoupCacheInputStream
+ */
+
+#ifndef __SOUP_CACHE_INPUT_STREAM_H__
+#define __SOUP_CACHE_INPUT_STREAM_H__
+
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CACHE_INPUT_STREAM           (soup_cache_input_stream_get_type())
+#define SOUP_CACHE_INPUT_STREAM(obj)           (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStream))
+#define SOUP_CACHE_INPUT_STREAM_CLASS(klass)   (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+#define SOUP_IS_CACHE_INPUT_STREAM(obj)                (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_IS_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_CACHE_INPUT_STREAM_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+
+typedef struct _SoupCacheInputStream      SoupCacheInputStream;
+typedef struct _SoupCacheInputStreamClass SoupCacheInputStreamClass;
+typedef struct _SoupCacheInputStreamPrivate SoupCacheInputStreamPrivate;
+
+struct _SoupCacheInputStreamClass
+{
+       SoupFilterInputStreamClass parent_class;
+};
+
+struct _SoupCacheInputStream
+{
+       SoupFilterInputStream parent;
+
+       SoupCacheInputStreamPrivate *priv;
+};
+
+GType soup_cache_input_stream_get_type (void) G_GNUC_CONST;
+
+GInputStream *soup_cache_input_stream_new               (GInputStream *base_stream);
+
+void          soup_cache_input_stream_cache             (SoupCacheInputStream  *istream,
+                                                        GFile                 *file,
+                                                        GCancellable          *cancellable,
+                                                        GAsyncReadyCallback    callback,
+                                                        gpointer               user_data);
+
+gsize         soup_cache_input_stream_cache_finish      (SoupCacheInputStream  *istream,
+                                                        GAsyncResult          *result,
+                                                        GError               **error);
+
+G_END_DECLS
+
+#endif /* __SOUP_CACHE_INPUT_STREAM_H__ */
index fff1f0e..21c70d4 100644 (file)
@@ -1,4 +1,5 @@
 libsoup/soup-body-input-stream.c
+libsoup/soup-cache-input-stream.c
 libsoup/soup-converter-wrapper.c
 libsoup/soup-message-client-io.c
 libsoup/soup-message-io.c