#include <glib/gi18n-lib.h>
#include "soup-cache-input-stream.h"
+#include "soup-marshal.h"
#include "soup-message-body.h"
static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
LAST_PROP
};
+enum {
+ CACHING_FINISHED,
+
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
struct _SoupCacheInputStreamPrivate
{
GOutputStream *output_stream;
+ GCancellable *cancellable;
gsize bytes_written;
gboolean read_finished;
SoupBuffer *current_writing_buffer;
GQueue *buffer_queue;
-
- GTask *task;
};
static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
{
SoupCacheInputStreamPrivate *priv = istream->priv;
- if (error)
- g_task_return_error (priv->task, error);
- else
- g_task_return_int (priv->task, priv->bytes_written);
+ g_signal_emit (istream, signals[CACHING_FINISHED], 0, priv->bytes_written, error);
+ g_clear_object (&priv->cancellable);
g_clear_object (&priv->output_stream);
- g_clear_object (&priv->task);
-}
-
-gsize
-soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream,
- GAsyncResult *result,
- GError **error)
-{
- return g_task_propagate_int (G_TASK (result), error);
}
static inline void
priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
if (error)
- g_task_return_error (priv->task, error);
+ notify_and_clear (istream, error);
else
try_write_next_buffer (istream);
-}
-
-void
-soup_cache_input_stream_cache (SoupCacheInputStream *istream,
- GFile *file,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- SoupCacheInputStreamPrivate *priv = istream->priv;
- priv->task = g_task_new (istream, cancellable, callback, user_data);
-
- g_file_replace_async (file, NULL, FALSE,
- G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
- G_PRIORITY_LOW, cancellable, file_replaced_cb, istream);
+ g_object_unref (istream);
}
static void
}
static void
-soup_cache_input_stream_dispose (GObject *object)
-{
- SoupCacheInputStreamPrivate *priv = SOUP_CACHE_INPUT_STREAM (object)->priv;
-
- g_clear_object (&priv->output_stream);
- g_clear_object (&priv->task);
-
- G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->dispose (object);
-}
-
-static void
soup_cache_input_stream_finalize (GObject *object)
{
SoupCacheInputStream *self = (SoupCacheInputStream *)object;
SoupCacheInputStreamPrivate *priv = self->priv;
+ g_clear_object (&priv->cancellable);
+ g_clear_object (&priv->output_stream);
g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
int priority;
g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
- g_assert (priv->task);
g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
priv->current_writing_buffer = buffer;
priority = G_PRIORITY_LOW;
g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
- priority, g_task_get_cancellable (priv->task),
+ priority, priv->cancellable,
(GAsyncReadyCallback) write_ready_cb,
g_object_ref (istream));
}
nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
cancellable, error);
- if (G_UNLIKELY (nread == -1 || priv->read_finished || !priv->task))
+ if (G_UNLIKELY (nread == -1 || priv->read_finished))
return nread;
if (nread == 0) {
pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
}
+static gboolean
+soup_cache_input_stream_close_fn (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ if (!priv->read_finished) {
+ if (priv->output_stream) {
+ /* Cancel any pending write operation or return an error if none. */
+ if (g_output_stream_has_pending (priv->output_stream))
+ g_cancellable_cancel (priv->cancellable);
+ else {
+ GError *error = NULL;
+ g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT,
+ _("Failed to completely cache the resource"));
+ notify_and_clear (istream, error);
+ }
+ } else if (priv->cancellable)
+ /* The file_replace_async() hasn't finished yet */
+ g_cancellable_cancel (priv->cancellable);
+ }
+
+ return G_INPUT_STREAM_CLASS (soup_cache_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
static void
soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
{
gobject_class->get_property = soup_cache_input_stream_get_property;
gobject_class->set_property = soup_cache_input_stream_set_property;
- gobject_class->dispose = soup_cache_input_stream_dispose;
gobject_class->finalize = soup_cache_input_stream_finalize;
istream_class->read_fn = soup_cache_input_stream_read_fn;
+ istream_class->close_fn = soup_cache_input_stream_close_fn;
g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
g_param_spec_object ("output-stream", "Output stream",
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY |
G_PARAM_STATIC_STRINGS));
+
+ signals[CACHING_FINISHED] =
+ g_signal_new ("caching-finished",
+ G_OBJECT_CLASS_TYPE (gobject_class),
+ G_SIGNAL_RUN_FIRST,
+ G_STRUCT_OFFSET (SoupCacheInputStreamClass, caching_finished),
+ NULL, NULL,
+ _soup_marshal_NONE__INT_BOXED,
+ G_TYPE_NONE, 2,
+ G_TYPE_INT, G_TYPE_ERROR);
}
GInputStream *
-soup_cache_input_stream_new (GInputStream *base_stream)
+soup_cache_input_stream_new (GInputStream *base_stream,
+ GFile *file)
{
- return g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
- "base-stream", base_stream,
- "close-base-stream", FALSE,
- NULL);
+ SoupCacheInputStream *istream = g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
+ "base-stream", base_stream,
+ "close-base-stream", FALSE,
+ NULL);
+
+ istream->priv->cancellable = g_cancellable_new ();
+ g_file_replace_async (file, NULL, FALSE,
+ G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
+ G_PRIORITY_DEFAULT, istream->priv->cancellable,
+ file_replaced_cb, g_object_ref (istream));
+
+ return (GInputStream *) istream;
}
} StreamHelper;
static void
-istream_cache_cb (GObject *source,
- GAsyncResult *res,
- gpointer user_data)
+istream_caching_finished (SoupCacheInputStream *istream,
+ gsize bytes_written,
+ GError *error,
+ gpointer user_data)
{
- SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (source);
StreamHelper *helper = (StreamHelper *) user_data;
SoupCache *cache = helper->cache;
SoupCacheEntry *entry = helper->entry;
- GError *error = NULL;
- entry->dirty = FALSE;
- g_clear_object (&entry->cancellable);
--cache->priv->n_pending;
- entry->length = soup_cache_input_stream_cache_finish (istream, res, &error);
+ entry->dirty = FALSE;
+ entry->length = bytes_written;
+ g_clear_object (&entry->cancellable);
if (error) {
/* Update cache size */
g_slice_free (StreamHelper, helper);
}
-
static GInputStream*
soup_cache_content_processor_wrap_input (SoupContentProcessor *processor,
GInputStream *base_stream,
return NULL;
}
- ++cache->priv->n_pending;
-
- istream = soup_cache_input_stream_new (base_stream);
-
- file = get_file_from_entry (cache, entry);
entry->cancellable = g_cancellable_new ();
+ ++cache->priv->n_pending;
helper = g_slice_new (StreamHelper);
helper->cache = g_object_ref (cache);
helper->entry = entry;
- soup_cache_input_stream_cache (SOUP_CACHE_INPUT_STREAM (istream), file, entry->cancellable,
- (GAsyncReadyCallback) istream_cache_cb, helper);
+ file = get_file_from_entry (cache, entry);
+ istream = soup_cache_input_stream_new (base_stream, file);
g_object_unref (file);
+ g_signal_connect (istream, "caching-finished", G_CALLBACK (istream_caching_finished), helper);
+
return istream;
}