soup-body-output-stream.c \
soup-cache.c \
soup-cache-private.h \
+ soup-client-input-stream.h \
+ soup-client-input-stream.c \
soup-connection.h \
soup-connection.c \
soup-content-decoder.c \
soup-filter-input-stream.h \
soup-form.c \
soup-headers.c \
- soup-http-input-stream.h \
- soup-http-input-stream.c \
soup-logger.c \
soup-marshal.h \
soup-marshal.c \
#include "soup-body-input-stream.h"
#include "soup-enum-types.h"
#include "soup-filter-input-stream.h"
+#include "soup-marshal.h"
#include "soup-message-headers.h"
typedef enum {
};
enum {
+ CLOSED,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
PROP_0,
PROP_ENCODING,
}
static gboolean
+soup_body_input_stream_close_fn (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_signal_emit (stream, signals[CLOSED], 0);
+
+ return G_INPUT_STREAM_CLASS (soup_body_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+static gboolean
soup_body_input_stream_is_readable (GPollableInputStream *stream)
{
SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
object_class->get_property = get_property;
input_stream_class->read_fn = soup_body_input_stream_read_fn;
+ input_stream_class->close_fn = soup_body_input_stream_close_fn;
+
+ signals[CLOSED] =
+ g_signal_new ("closed",
+ G_OBJECT_CLASS_TYPE (object_class),
+ G_SIGNAL_RUN_LAST,
+ 0,
+ NULL, NULL,
+ _soup_marshal_NONE__NONE,
+ G_TYPE_NONE, 0);
g_object_class_install_property (
object_class, PROP_ENCODING,
--- /dev/null
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-client-input-stream.c
+ *
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gio/gio.h>
+
+#include "soup-client-input-stream.h"
+#include "soup-marshal.h"
+#include "soup-message.h"
+#include "soup-message-private.h"
+
+struct _SoupClientInputStreamPrivate {
+ SoupMessage *msg;
+};
+
+enum {
+ EOF,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
+ PROP_0,
+
+ PROP_MESSAGE
+};
+
+static GPollableInputStreamInterface *soup_client_input_stream_parent_pollable_interface;
+static void soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupClientInputStream, soup_client_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_client_input_stream_pollable_init))
+
+static void
+soup_client_input_stream_init (SoupClientInputStream *stream)
+{
+ stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+ SOUP_TYPE_CLIENT_INPUT_STREAM,
+ SoupClientInputStreamPrivate);
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+ const GValue *value, GParamSpec *pspec)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_MESSAGE:
+ cistream->priv->msg = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+ GValue *value, GParamSpec *pspec)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_MESSAGE:
+ g_value_set_object (value, cistream->priv->msg);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gssize
+soup_client_input_stream_read_fn (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nread;
+
+ nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ read_fn (stream, buffer, count, cancellable, error);
+
+ if (nread == 0)
+ g_signal_emit (stream, signals[EOF], 0);
+
+ return nread;
+}
+
+static gssize
+soup_client_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ gssize nread;
+
+ nread = soup_client_input_stream_parent_pollable_interface->
+ read_nonblocking (stream, buffer, count, error);
+
+ if (nread == 0)
+ g_signal_emit (stream, signals[EOF], 0);
+
+ return nread;
+}
+
+static gboolean
+soup_client_input_stream_close_fn (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+
+ if (!soup_message_io_run_until_finish (cistream->priv->msg,
+ cancellable, error))
+ return FALSE;
+
+ return G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+typedef struct {
+ SoupClientInputStream *cistream;
+ gint priority;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+} CloseAsyncData;
+
+static void
+close_async_data_free (CloseAsyncData *cad)
+{
+ if (cad->cancellable)
+ g_object_unref (cad->cancellable);
+ g_object_unref (cad->result);
+ g_slice_free (CloseAsyncData, cad);
+}
+
+static void
+base_stream_closed (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ CloseAsyncData *cad = user_data;
+ GError *error = NULL;
+
+ if (G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ close_finish (G_INPUT_STREAM (cad->cistream), result, &error))
+ g_simple_async_result_set_op_res_gboolean (cad->result, TRUE);
+ else
+ g_simple_async_result_take_error (cad->result, error);
+
+ g_simple_async_result_complete_in_idle (cad->result);
+ close_async_data_free (cad);
+}
+
+static gboolean
+close_async_ready (SoupMessage *msg, gpointer user_data)
+{
+ CloseAsyncData *cad = user_data;
+ GError *error = NULL;
+
+ if (soup_message_io_run_until_finish (cad->cistream->priv->msg,
+ cad->cancellable, &error)) {
+ G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ close_async (G_INPUT_STREAM (cad->cistream),
+ cad->priority,
+ cad->cancellable,
+ base_stream_closed,
+ cad);
+ return FALSE;
+ } else if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_simple_async_result_take_error (cad->result, error);
+ g_simple_async_result_complete_in_idle (cad->result);
+ close_async_data_free (cad);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static void
+soup_client_input_stream_close_async (GInputStream *stream,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ CloseAsyncData *cad;
+ GSource *source;
+
+ cad = g_slice_new (CloseAsyncData);
+ cad->cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+ cad->result = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ soup_client_input_stream_close_async);
+ cad->priority = priority;
+ cad->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+ source = soup_message_io_get_source (cad->cistream->priv->msg,
+ cancellable,
+ close_async_ready, cad);
+ g_source_set_priority (source, priority);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+}
+
+static gboolean
+soup_client_input_stream_close_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+ else
+ return g_simple_async_result_get_op_res_gboolean (simple);
+}
+
+static void
+soup_client_input_stream_class_init (SoupClientInputStreamClass *stream_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+ GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+ g_type_class_add_private (stream_class, sizeof (SoupClientInputStreamPrivate));
+
+ object_class->set_property = set_property;
+ object_class->get_property = get_property;
+
+ input_stream_class->read_fn = soup_client_input_stream_read_fn;
+ input_stream_class->close_fn = soup_client_input_stream_close_fn;
+ input_stream_class->close_async = soup_client_input_stream_close_async;
+ input_stream_class->close_finish = soup_client_input_stream_close_finish;
+
+ signals[EOF] =
+ g_signal_new ("eof",
+ G_OBJECT_CLASS_TYPE (object_class),
+ G_SIGNAL_RUN_LAST,
+ 0,
+ NULL, NULL,
+ _soup_marshal_NONE__NONE,
+ G_TYPE_NONE, 0);
+
+ g_object_class_install_property (
+ object_class, PROP_MESSAGE,
+ g_param_spec_object ("message",
+ "Message",
+ "Message",
+ SOUP_TYPE_MESSAGE,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ soup_client_input_stream_parent_pollable_interface =
+ g_type_interface_peek_parent (pollable_interface);
+
+ pollable_interface->read_nonblocking = soup_client_input_stream_read_nonblocking;
+}
+
+GInputStream *
+soup_client_input_stream_new (GInputStream *base_stream,
+ SoupMessage *msg)
+{
+ return g_object_new (SOUP_TYPE_CLIENT_INPUT_STREAM,
+ "base-stream", base_stream,
+ "message", msg,
+ NULL);
+}
--- /dev/null
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_CLIENT_INPUT_STREAM_H
+#define SOUP_CLIENT_INPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CLIENT_INPUT_STREAM (soup_client_input_stream_get_type ())
+#define SOUP_CLIENT_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStream))
+#define SOUP_CLIENT_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+#define SOUP_IS_CLIENT_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_IS_CLIENT_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_CLIENT_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+
+typedef struct _SoupClientInputStreamPrivate SoupClientInputStreamPrivate;
+
+typedef struct {
+ SoupFilterInputStream parent;
+
+ SoupClientInputStreamPrivate *priv;
+} SoupClientInputStream;
+
+typedef struct {
+ SoupFilterInputStreamClass parent_class;
+
+ /* Padding for future expansion */
+ void (*_libsoup_reserved1) (void);
+ void (*_libsoup_reserved2) (void);
+ void (*_libsoup_reserved3) (void);
+ void (*_libsoup_reserved4) (void);
+} SoupClientInputStreamClass;
+
+GType soup_client_input_stream_get_type (void);
+
+GInputStream *soup_client_input_stream_new (GInputStream *base_stream,
+ SoupMessage *msg);
+
+G_END_DECLS
+
+#endif /* SOUP_CLIENT_INPUT_STREAM_H */
+++ /dev/null
-/* soup-input-stream.c, based on gsocketinputstream.c
- *
- * Copyright (C) 2006-2007, 2010 Red Hat, Inc.
- * Copyright (C) 2010 Igalia, S.L.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#include <config.h>
-
-#include <string.h>
-
-#include <glib.h>
-#include <gio/gio.h>
-
-#include "soup-http-input-stream.h"
-#include "soup-headers.h"
-#include "soup-content-sniffer.h"
-#include "soup-session.h"
-
-G_DEFINE_TYPE (SoupHTTPInputStream, soup_http_input_stream, G_TYPE_INPUT_STREAM)
-
-typedef void (*SoupHTTPInputStreamCallback)(GInputStream *);
-
-typedef struct {
- SoupSession *session;
- GMainContext *async_context;
- SoupMessage *msg;
- gboolean got_headers, finished;
- goffset offset;
-
- GCancellable *cancellable;
- guint cancel_id;
- SoupHTTPInputStreamCallback got_headers_cb;
- SoupHTTPInputStreamCallback got_chunk_cb;
- SoupHTTPInputStreamCallback finished_cb;
- SoupHTTPInputStreamCallback cancelled_cb;
-
- GQueue *leftover_queue;
-
- guchar *caller_buffer;
- gsize caller_bufsize, caller_nread;
- GAsyncReadyCallback outstanding_callback;
- GSimpleAsyncResult *result;
-
- char *sniffed_content_type;
-} SoupHTTPInputStreamPrivate;
-#define SOUP_HTTP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamPrivate))
-
-
-static gssize soup_http_input_stream_read (GInputStream *stream,
- void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error);
-static gboolean soup_http_input_stream_close (GInputStream *stream,
- GCancellable *cancellable,
- GError **error);
-static void soup_http_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gssize soup_http_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
-static void soup_http_input_stream_close_async (GInputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer data);
-static gboolean soup_http_input_stream_close_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error);
-
-static void soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream);
-static void soup_http_input_stream_content_sniffed (SoupMessage *msg, const char *content_type, GHashTable *params, gpointer stream);
-static void soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk, gpointer stream);
-static void soup_http_input_stream_restarted (SoupMessage *msg, gpointer stream);
-static void soup_http_input_stream_finished (SoupMessage *msg, gpointer stream);
-
-static void
-soup_http_input_stream_finalize (GObject *object)
-{
- SoupHTTPInputStream *stream = SOUP_HTTP_INPUT_STREAM (object);
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_object_unref (priv->session);
-
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_headers), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_content_sniffed), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_chunk), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_restarted), stream);
- g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_finished), stream);
- g_object_unref (priv->msg);
-
- g_queue_foreach (priv->leftover_queue, (GFunc) soup_buffer_free, NULL);
- g_queue_free (priv->leftover_queue);
-
- g_free (priv->sniffed_content_type);
-
- if (G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize)
- (*G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize)(object);
-}
-
-static void
-soup_http_input_stream_class_init (SoupHTTPInputStreamClass *klass)
-{
- GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
-
- g_type_class_add_private (klass, sizeof (SoupHTTPInputStreamPrivate));
-
- gobject_class->finalize = soup_http_input_stream_finalize;
-
- stream_class->read_fn = soup_http_input_stream_read;
- stream_class->close_fn = soup_http_input_stream_close;
- stream_class->read_async = soup_http_input_stream_read_async;
- stream_class->read_finish = soup_http_input_stream_read_finish;
- stream_class->close_async = soup_http_input_stream_close_async;
- stream_class->close_finish = soup_http_input_stream_close_finish;
-}
-
-static void
-soup_http_input_stream_init (SoupHTTPInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->leftover_queue = g_queue_new ();
-}
-
-static void
-soup_http_input_stream_queue_message (SoupHTTPInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->got_headers = priv->finished = FALSE;
-
- if (soup_session_get_feature_for_message (priv->session, SOUP_TYPE_CONTENT_SNIFFER, priv->msg)) {
- g_signal_connect (priv->msg, "content_sniffed",
- G_CALLBACK (soup_http_input_stream_content_sniffed), stream);
- } else {
- g_signal_connect (priv->msg, "got_headers",
- G_CALLBACK (soup_http_input_stream_got_headers), stream);
- }
- g_signal_connect (priv->msg, "got_chunk",
- G_CALLBACK (soup_http_input_stream_got_chunk), stream);
- g_signal_connect (priv->msg, "restarted",
- G_CALLBACK (soup_http_input_stream_restarted), stream);
- g_signal_connect (priv->msg, "finished",
- G_CALLBACK (soup_http_input_stream_finished), stream);
-
- /* Add an extra ref since soup_session_queue_message steals one */
- g_object_ref (priv->msg);
- soup_session_queue_message (priv->session, priv->msg, NULL, NULL);
-}
-
-/**
- * soup_http_input_stream_new:
- * @session: the #SoupSession to use
- * @msg: the #SoupMessage whose response will be streamed
- *
- * Prepares to send @msg over @session, and returns a #GInputStream
- * that can be used to read the response.
- *
- * @msg may not be sent until the first read call; if you need to look
- * at the status code or response headers before reading the body, you
- * can use soup_http_input_stream_send() or soup_http_input_stream_send_async()
- * to force the message to be sent and the response headers read.
- *
- * If @msg gets a non-2xx result, the first read (or send) will return
- * an error with type %SOUP_HTTP_INPUT_STREAM_HTTP_ERROR.
- *
- * Internally, #SoupHTTPInputStream is implemented using asynchronous I/O,
- * so if you are using the synchronous API (eg,
- * g_input_stream_read()), you should create a new #GMainContext and
- * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If
- * you don't, then synchronous #GInputStream calls will cause the main
- * loop to be run recursively.) The async #GInputStream API works fine
- * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset.
- *
- * Returns: a new #GInputStream.
- **/
-GInputStream *
-soup_http_input_stream_new (SoupSession *session, SoupMessage *msg)
-{
- SoupHTTPInputStream *stream;
- SoupHTTPInputStreamPrivate *priv;
-
- g_return_val_if_fail (SOUP_IS_MESSAGE (msg), NULL);
-
- stream = g_object_new (SOUP_TYPE_HTTP_INPUT_STREAM, NULL);
- priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->session = g_object_ref (session);
- priv->async_context = soup_session_get_async_context (session);
- priv->msg = g_object_ref (msg);
-
- return (GInputStream *)stream;
-}
-
-static void
-soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- /* If the message is expected to be restarted then we read the
- * whole message first and hope it does get restarted, but
- * if it doesn't, then we stream the body belatedly.
- */
- if (msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
- msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
- soup_session_would_redirect (priv->session, msg))
- return;
-
- priv->got_headers = TRUE;
- if (!priv->caller_buffer) {
- /* Not ready to read the body yet */
- soup_session_pause_message (priv->session, msg);
- }
-
- if (priv->got_headers_cb)
- priv->got_headers_cb (stream);
-}
-
-static void
-soup_http_input_stream_content_sniffed (SoupMessage *msg, const char *content_type,
- GHashTable *params, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GString *sniffed_type;
-
- sniffed_type = g_string_new (content_type);
- if (params) {
- GHashTableIter iter;
- gpointer key, value;
-
- g_hash_table_iter_init (&iter, params);
- while (g_hash_table_iter_next (&iter, &key, &value)) {
- g_string_append (sniffed_type, "; ");
- soup_header_g_string_append_param (sniffed_type, key, value);
- }
- }
- g_free (priv->sniffed_content_type);
- priv->sniffed_content_type = g_string_free (sniffed_type, FALSE);
-
- soup_http_input_stream_got_headers (msg, stream);
-}
-
-static void
-soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer,
- gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- const gchar *chunk = chunk_buffer->data;
- gsize chunk_size = chunk_buffer->length;
-
- /* Copy what we can into priv->caller_buffer */
- if (priv->caller_bufsize > priv->caller_nread && priv->leftover_queue->length == 0) {
- gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread);
-
- memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread);
- priv->caller_nread += nread;
- priv->offset += nread;
- chunk += nread;
- chunk_size -= nread;
- }
-
- if (chunk_size > 0) {
- if (priv->leftover_queue->length > 0) {
- g_queue_push_tail (priv->leftover_queue, soup_buffer_copy (chunk_buffer));
- } else {
- g_queue_push_head (priv->leftover_queue,
- soup_buffer_new_subbuffer (chunk_buffer,
- chunk_buffer->length - chunk_size,
- chunk_size));
- }
- }
-
- if (priv->got_headers) {
- soup_session_pause_message (priv->session, msg);
- if (priv->got_chunk_cb)
- priv->got_chunk_cb (stream);
- }
-}
-
-static void
-soup_http_input_stream_restarted (SoupMessage *msg, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GList *q;
-
- /* Throw away any pending read data */
- for (q = priv->leftover_queue->head; q; q = q->next)
- soup_buffer_free (q->data);
- g_queue_clear (priv->leftover_queue);
-}
-
-static void
-soup_http_input_stream_finished (SoupMessage *msg, gpointer stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->got_headers = TRUE;
- priv->finished = TRUE;
-
- if (priv->finished_cb)
- priv->finished_cb (stream);
-}
-
-static void
-soup_http_input_stream_cancelled (GCancellable *cancellable,
- gpointer user_data)
-{
- SoupHTTPInputStream *stream = user_data;
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_signal_handler_disconnect (cancellable, priv->cancel_id);
- priv->cancel_id = 0;
-
- soup_session_pause_message (priv->session, priv->msg);
- if (priv->cancelled_cb)
- priv->cancelled_cb (G_INPUT_STREAM (stream));
-}
-
-static void
-soup_http_input_stream_prepare_for_io (GInputStream *stream,
- GCancellable *cancellable,
- guchar *buffer,
- gsize count)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->cancellable = cancellable;
- if (cancellable) {
- priv->cancel_id = g_signal_connect (cancellable, "cancelled",
- G_CALLBACK (soup_http_input_stream_cancelled),
- stream);
- }
-
- priv->caller_buffer = buffer;
- priv->caller_bufsize = count;
- priv->caller_nread = 0;
-
- if (priv->got_headers)
- soup_session_unpause_message (priv->session, priv->msg);
-}
-
-static void
-soup_http_input_stream_done_io (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- if (priv->cancel_id) {
- g_signal_handler_disconnect (priv->cancellable, priv->cancel_id);
- priv->cancel_id = 0;
- }
- priv->cancellable = NULL;
-
- priv->caller_buffer = NULL;
- priv->caller_bufsize = 0;
-}
-
-static gboolean
-set_error_if_http_failed (SoupMessage *msg, GError **error)
-{
- if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
- g_set_error_literal (error, SOUP_HTTP_ERROR,
- msg->status_code, msg->reason_phrase);
- return TRUE;
- }
- return FALSE;
-}
-
-static gsize
-read_from_leftover (SoupHTTPInputStreamPrivate *priv,
- gpointer buffer, gsize bufsize)
-{
- gsize nread;
- SoupBuffer *soup_buffer = (SoupBuffer *) g_queue_peek_head (priv->leftover_queue);
- gboolean fits_in_buffer = soup_buffer->length <= bufsize;
-
- nread = fits_in_buffer ? soup_buffer->length : bufsize;
- memcpy (buffer, soup_buffer->data, nread);
-
- g_queue_pop_head (priv->leftover_queue);
- if (!fits_in_buffer)
- g_queue_push_head (priv->leftover_queue,
- soup_buffer_new_subbuffer (soup_buffer, nread, soup_buffer->length - nread));
- soup_buffer_free (soup_buffer);
-
- priv->offset += nread;
- return nread;
-}
-
-/* This does the work of soup_http_input_stream_send(), assuming that the
- * GInputStream pending flag has already been set. It is also used by
- * soup_http_input_stream_send_async() in some circumstances.
- */
-static gboolean
-soup_http_input_stream_send_internal (GInputStream *stream,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
- while (!priv->finished && !priv->got_headers &&
- !g_cancellable_is_cancelled (cancellable))
- g_main_context_iteration (priv->async_context, TRUE);
- soup_http_input_stream_done_io (stream);
-
- if (g_cancellable_set_error_if_cancelled (cancellable, error))
- return FALSE;
- else if (set_error_if_http_failed (priv->msg, error))
- return FALSE;
- return TRUE;
-}
-
-static void
-send_sync_finished (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- priv->got_headers_cb = NULL;
- priv->finished_cb = NULL;
-
- /* Wake up the main context iteration */
- soup_add_completion (priv->async_context, NULL, NULL);
-}
-
-/**
- * soup_http_input_stream_send:
- * @httpstream: a #SoupHTTPInputStream
- * @cancellable: optional #GCancellable object, %NULL to ignore.
- * @error: location to store the error occuring, or %NULL to ignore
- *
- * Synchronously sends the HTTP request associated with @stream, and
- * reads the response headers. Call this after soup_http_input_stream_new()
- * and before the first g_input_stream_read() if you want to check the
- * HTTP status code before you start reading.
- *
- * Return value: %TRUE if msg has a successful (2xx) status, %FALSE if
- * not.
- **/
-gboolean
-soup_http_input_stream_send (SoupHTTPInputStream *httpstream,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
- GInputStream *istream = (GInputStream *)httpstream;
- gboolean result;
-
- g_return_val_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream), FALSE);
-
- soup_http_input_stream_queue_message (httpstream);
-
- if (!g_input_stream_set_pending (istream, error))
- return FALSE;
-
- priv->got_headers_cb = send_sync_finished;
- priv->finished_cb = send_sync_finished;
-
- result = soup_http_input_stream_send_internal (istream, cancellable, error);
- g_input_stream_clear_pending (istream);
-
- return result;
-}
-
-static gssize
-soup_http_input_stream_read (GInputStream *stream,
- void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- /* If there is data leftover from a previous read, return it. */
- if (priv->leftover_queue->length)
- return read_from_leftover (priv, buffer, count);
-
- if (priv->finished)
- return 0;
-
- /* No leftover data, accept one chunk from the network */
- soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
- while (!priv->finished && priv->caller_nread == 0 &&
- !g_cancellable_is_cancelled (cancellable))
- g_main_context_iteration (priv->async_context, TRUE);
- soup_http_input_stream_done_io (stream);
-
- if (priv->caller_nread > 0)
- return priv->caller_nread;
-
- if (g_cancellable_set_error_if_cancelled (cancellable, error))
- return -1;
- else if (set_error_if_http_failed (priv->msg, error))
- return -1;
- else
- return 0;
-}
-
-static gboolean
-soup_http_input_stream_close (GInputStream *stream,
- GCancellable *cancellable,
- GError **error)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- if (!priv->finished) {
- soup_session_unpause_message (priv->session, priv->msg);
- soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
- }
-
- return TRUE;
-}
-
-static void
-wrapper_callback (GObject *source_object, GAsyncResult *res,
- gpointer user_data)
-{
- GInputStream *stream = G_INPUT_STREAM (source_object);
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_input_stream_clear_pending (stream);
- if (priv->outstanding_callback)
- (*priv->outstanding_callback) (source_object, res, user_data);
- priv->outstanding_callback = NULL;
- g_object_unref (stream);
-}
-
-static void
-send_async_finished (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GSimpleAsyncResult *result;
- GError *error = NULL;
-
- if (!g_cancellable_set_error_if_cancelled (priv->cancellable, &error))
- set_error_if_http_failed (priv->msg, &error);
-
- priv->got_headers_cb = NULL;
- priv->finished_cb = NULL;
- soup_http_input_stream_done_io (stream);
-
- result = priv->result;
- priv->result = NULL;
-
- g_simple_async_result_set_op_res_gboolean (result, error == NULL);
- if (error)
- g_simple_async_result_take_error (result, error);
- g_simple_async_result_complete (result);
- g_object_unref (result);
-}
-
-static void
-soup_http_input_stream_send_async_internal (GInputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
-
- g_return_if_fail (priv->async_context == g_main_context_get_thread_default ());
-
- g_object_ref (stream);
- priv->outstanding_callback = callback;
-
- priv->got_headers_cb = send_async_finished;
- priv->finished_cb = send_async_finished;
-
- soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
- priv->result = g_simple_async_result_new (G_OBJECT (stream),
- wrapper_callback, user_data,
- soup_http_input_stream_send_async);
-}
-
-/**
- * soup_http_input_stream_send_async:
- * @httpstream: a #SoupHTTPInputStream
- * @io_priority: the io priority of the request.
- * @cancellable: optional #GCancellable object, %NULL to ignore.
- * @callback: callback to call when the request is satisfied
- * @user_data: the data to pass to callback function
- *
- * Asynchronously sends the HTTP request associated with @stream, and
- * reads the response headers. Call this after soup_http_input_stream_new()
- * and before the first g_input_stream_read_async() if you want to
- * check the HTTP status code before you start reading.
- **/
-void
-soup_http_input_stream_send_async (SoupHTTPInputStream *httpstream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GInputStream *istream = (GInputStream *)httpstream;
- GError *error = NULL;
-
- g_return_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream));
-
- soup_http_input_stream_queue_message (httpstream);
-
- if (!g_input_stream_set_pending (istream, &error)) {
- g_simple_async_report_take_gerror_in_idle (G_OBJECT (httpstream),
- callback,
- user_data,
- error);
- return;
- }
- soup_http_input_stream_send_async_internal (istream, io_priority, cancellable,
- callback, user_data);
-}
-
-/**
- * soup_http_input_stream_send_finish:
- * @httpstream: a #SoupHTTPInputStream
- * @result: a #GAsyncResult.
- * @error: a #GError location to store the error occuring, or %NULL to
- * ignore.
- *
- * Finishes a soup_http_input_stream_send_async() operation.
- *
- * Return value: %TRUE if the message was sent successfully and
- * received a successful status code, %FALSE if not.
- **/
-gboolean
-soup_http_input_stream_send_finish (SoupHTTPInputStream *httpstream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
-
- g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE);
- simple = G_SIMPLE_ASYNC_RESULT (result);
-
- g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_send_async, FALSE);
-
- if (g_simple_async_result_propagate_error (simple, error))
- return FALSE;
-
- return g_simple_async_result_get_op_res_gboolean (simple);
-}
-
-static void
-read_async_done (GInputStream *stream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GSimpleAsyncResult *result;
- GError *error = NULL;
-
- result = priv->result;
- priv->result = NULL;
-
- if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) ||
- set_error_if_http_failed (priv->msg, &error))
- g_simple_async_result_take_error (result, error);
- else
- g_simple_async_result_set_op_res_gssize (result, priv->caller_nread);
-
- priv->got_chunk_cb = NULL;
- priv->finished_cb = NULL;
- priv->cancelled_cb = NULL;
- soup_http_input_stream_done_io (stream);
-
- g_simple_async_result_complete (result);
- g_object_unref (result);
-}
-
-static void
-soup_http_input_stream_read_async (GInputStream *stream,
- void *buffer,
- gsize count,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
- GSimpleAsyncResult *result;
-
- g_return_if_fail (priv->async_context == g_main_context_get_thread_default ());
-
- result = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- soup_http_input_stream_read_async);
-
- if (priv->leftover_queue->length) {
- gsize nread = read_from_leftover (priv, buffer, count);
- g_simple_async_result_set_op_res_gssize (result, nread);
- g_simple_async_result_complete_in_idle (result);
- g_object_unref (result);
- return;
- }
-
- if (priv->finished) {
- g_simple_async_result_set_op_res_gssize (result, 0);
- g_simple_async_result_complete_in_idle (result);
- g_object_unref (result);
- return;
- }
-
- priv->result = result;
-
- priv->got_chunk_cb = read_async_done;
- priv->finished_cb = read_async_done;
- priv->cancelled_cb = read_async_done;
- soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
-}
-
-static gssize
-soup_http_input_stream_read_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- GSimpleAsyncResult *simple;
-
- g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1);
- simple = G_SIMPLE_ASYNC_RESULT (result);
- g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_read_async, -1);
-
- return g_simple_async_result_get_op_res_gssize (simple);
-}
-
-static void
-soup_http_input_stream_close_async (GInputStream *stream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data)
-{
- GSimpleAsyncResult *result;
- gboolean success;
- GError *error = NULL;
-
- result = g_simple_async_result_new (G_OBJECT (stream),
- callback, user_data,
- soup_http_input_stream_close_async);
- success = soup_http_input_stream_close (stream, cancellable, &error);
- g_simple_async_result_set_op_res_gboolean (result, success);
- if (error)
- g_simple_async_result_take_error (result, error);
-
- g_simple_async_result_complete_in_idle (result);
- g_object_unref (result);
-}
-
-static gboolean
-soup_http_input_stream_close_finish (GInputStream *stream,
- GAsyncResult *result,
- GError **error)
-{
- /* Failures handled in generic close_finish code */
- return TRUE;
-}
-
-SoupMessage *
-soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
- return priv->msg ? g_object_ref (priv->msg) : NULL;
-}
-
-const char *
-soup_http_input_stream_get_content_type (SoupHTTPInputStream *httpstream)
-{
- SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
-
- if (priv->sniffed_content_type)
- return priv->sniffed_content_type;
- else
- return soup_message_headers_get_content_type (priv->msg->response_headers, NULL);
-
-}
+++ /dev/null
-/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
-/*
- * Copyright (C) 2006, 2007, 2009, 2010 Red Hat, Inc.
- * Copyright (C) 2010 Igalia, S.L.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General
- * Public License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
- * Boston, MA 02111-1307, USA.
- */
-
-#ifndef __SOUP_HTTP_INPUT_STREAM_H__
-#define __SOUP_HTTP_INPUT_STREAM_H__
-
-#include <gio/gio.h>
-#include <libsoup/soup-types.h>
-
-G_BEGIN_DECLS
-
-#define SOUP_TYPE_HTTP_INPUT_STREAM (soup_http_input_stream_get_type ())
-#define SOUP_HTTP_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStream))
-#define SOUP_HTTP_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST ((k), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
-#define SOUP_IS_HTTP_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SOUP_TYPE_HTTP_INPUT_STREAM))
-#define SOUP_IS_HTTP_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SOUP_TYPE_HTTP_INPUT_STREAM))
-#define SOUP_HTTP_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
-
-typedef struct SoupHTTPInputStream SoupHTTPInputStream;
-typedef struct SoupHTTPInputStreamClass SoupHTTPInputStreamClass;
-
-struct SoupHTTPInputStream {
- GInputStream parent;
-};
-
-struct SoupHTTPInputStreamClass {
- GInputStreamClass parent_class;
-
- /* Padding for future expansion */
- void (*_g_reserved1)(void);
- void (*_g_reserved2)(void);
- void (*_g_reserved3)(void);
- void (*_g_reserved4)(void);
- void (*_g_reserved5)(void);
-};
-
-GType soup_http_input_stream_get_type (void) G_GNUC_CONST;
-
-GInputStream *soup_http_input_stream_new (SoupSession *session,
- SoupMessage *msg);
-
-gboolean soup_http_input_stream_send (SoupHTTPInputStream *httpstream,
- GCancellable *cancellable,
- GError **error);
-
-void soup_http_input_stream_send_async (SoupHTTPInputStream *httpstream,
- int io_priority,
- GCancellable *cancellable,
- GAsyncReadyCallback callback,
- gpointer user_data);
-gboolean soup_http_input_stream_send_finish (SoupHTTPInputStream *httpstream,
- GAsyncResult *result,
- GError **error);
-
-SoupMessage *soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream);
-
-const char *soup_http_input_stream_get_content_type (SoupHTTPInputStream *httpstream);
-
-G_END_DECLS
-
-#endif /* __SOUP_HTTP_INPUT_STREAM_H__ */
#include <stdlib.h>
#include <string.h>
+#include <glib/gi18n-lib.h>
+
#include "soup-body-input-stream.h"
#include "soup-body-output-stream.h"
+#include "soup-client-input-stream.h"
#include "soup-connection.h"
#include "soup-content-sniffer-stream.h"
#include "soup-converter-wrapper.h"
typedef enum {
SOUP_MESSAGE_IO_STATE_NOT_STARTED,
+ SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
SOUP_MESSAGE_IO_STATE_HEADERS,
SOUP_MESSAGE_IO_STATE_BLOCKING,
SOUP_MESSAGE_IO_STATE_BODY_START,
(state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
state != SOUP_MESSAGE_IO_STATE_DONE)
+#define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
+ (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
+ state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
typedef struct {
SoupMessageQueueItem *item;
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
if (io->unpause_source) {
g_source_destroy (io->unpause_source);
+ g_source_unref (io->unpause_source);
io->unpause_source = NULL;
}
&got_lf,
cancellable, error);
io->read_header_buf->len = old_len + MAX (nread, 0);
- if (nread == 0)
- io_error (io->sock, msg, NULL);
+ if (nread == 0) {
+ soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_PARTIAL_INPUT,
+ _("Connection terminated unexpectedly"));
+ }
if (nread <= 0)
return FALSE;
GInputStream *filter;
GSList *d;
- io->body_istream = soup_body_input_stream_new (io->istream,
- io->read_encoding,
- io->read_length);
+ io->body_istream =
+ soup_body_input_stream_new (io->istream,
+ io->read_encoding,
+ io->read_length);
for (d = priv->decoders; d; d = d->next) {
decoder = d->data;
gssize nwrote;
switch (io->write_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!io->write_buf->len) {
io->get_headers_cb (msg, io->write_buf,
if (!io->write_chunk) {
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
guint status;
switch (io->read_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!read_headers (msg, cancellable, error))
return FALSE;
if (priv->chunk_allocator) {
buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
if (!buffer) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
return TRUE;
}
-static GSource *
+typedef struct {
+ GSource source;
+ SoupMessage *msg;
+} SoupMessageSource;
+
+static gboolean
+message_source_prepare (GSource *source,
+ gint *timeout)
+{
+ *timeout = -1;
+ return FALSE;
+}
+
+static gboolean
+message_source_check (GSource *source)
+{
+ return FALSE;
+}
+
+static gboolean
+message_source_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ return (*func) (message_source->msg, user_data);
+}
+
+static void
+message_source_finalize (GSource *source)
+{
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ g_object_unref (message_source->msg);
+}
+
+static gboolean
+message_source_closure_callback (SoupMessage *msg,
+ gpointer data)
+{
+ GClosure *closure = data;
+
+ GValue param = G_VALUE_INIT;
+ GValue result_value = G_VALUE_INIT;
+ gboolean result;
+
+ g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+ g_value_init (¶m, SOUP_TYPE_MESSAGE);
+ g_value_set_object (¶m, msg);
+
+ g_closure_invoke (closure, &result_value, 1, ¶m, NULL);
+
+ result = g_value_get_boolean (&result_value);
+ g_value_unset (&result_value);
+ g_value_unset (¶m);
+
+ return result;
+}
+
+static GSourceFuncs message_source_funcs =
+{
+ message_source_prepare,
+ message_source_check,
+ message_source_dispatch,
+ message_source_finalize,
+ (GSourceFunc)message_source_closure_callback,
+ (GSourceDummyMarshal)g_cclosure_marshal_generic,
+};
+
+GSource *
soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
- GSourceFunc callback, gpointer user_data)
+ SoupMessageSourceFunc callback, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- GSource *source;
-
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
- source = g_pollable_input_stream_create_source (
- G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
- } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
- source = g_pollable_output_stream_create_source (
- G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
+ GSource *base_source, *source;
+ SoupMessageSource *message_source;
+
+ if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
+ GPollableInputStream *istream;
+
+ if (io->body_istream)
+ istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
+ else
+ istream = G_POLLABLE_INPUT_STREAM (io->istream);
+ base_source = g_pollable_input_stream_create_source (istream, cancellable);
+ } else if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
+ GPollableOutputStream *ostream;
+
+ if (io->body_ostream)
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
+ else
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
+ base_source = g_pollable_output_stream_create_source (ostream, cancellable);
} else
- g_return_val_if_reached (NULL);
-
- g_source_set_callback (source, callback, user_data, NULL);
+ base_source = g_timeout_source_new (0);
+
+ g_source_set_dummy_callback (base_source);
+ source = g_source_new (&message_source_funcs,
+ sizeof (SoupMessageSource));
+ g_source_set_name (source, "SoupMessageSource");
+ message_source = (SoupMessageSource *)source;
+ message_source->msg = g_object_ref (msg);
+
+ g_source_add_child_source (source, base_source);
+ g_source_unref (base_source);
+ g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
return source;
}
-static gboolean io_run (GObject *stream, SoupMessage *msg);
-
-static void
-setup_io_source (SoupMessage *msg)
+static gboolean
+io_run_until (SoupMessage *msg,
+ SoupMessageIOState read_state, SoupMessageIOState write_state,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
+ gboolean progress = TRUE, done;
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
+ else if (!io) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return FALSE;
+ }
+
+ g_object_ref (msg);
- io->io_source = soup_message_io_get_source (msg, NULL,
- (GSourceFunc)io_run, msg);
- g_source_attach (io->io_source, io->async_context);
- g_source_unref (io->io_source);
+ while (progress && priv->io_data == io && !io->paused &&
+ (io->read_state < read_state || io->write_state < write_state)) {
+
+ if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+ progress = io_read (msg, cancellable, error);
+ else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+ progress = io_write (msg, cancellable, error);
+ else
+ progress = FALSE;
+ }
+
+ done = (priv->io_data == io &&
+ io->read_state >= read_state &&
+ io->write_state >= write_state);
+
+ g_object_unref (msg);
+ return done;
}
static gboolean
-io_run (GObject *stream, SoupMessage *msg)
+io_run (SoupMessage *msg, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
g_object_ref (msg);
- while (priv->io_data == io && !io->paused) {
- gboolean progress = FALSE;
+ if (io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ io->cancellable, &error)) {
+ soup_message_io_finished (msg);
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&error);
+ io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+ g_source_attach (io->io_source, io->async_context);
+ } else if (error) {
+ io_error (io->sock, msg, error);
+ }
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- progress = io_read (msg, io->cancellable, &error);
- else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- progress = io_write (msg, io->cancellable, &error);
+ g_object_unref (msg);
+ return FALSE;
+}
- if (!progress)
- break;
- }
+gboolean
+soup_message_io_run_until_write (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ cancellable, error);
+}
- if (error) {
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_clear_error (&error);
- setup_io_source (msg);
- } else
- io_error (io->sock, msg, error);
- } else if (priv->io_data == io &&
- io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
- io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
- soup_message_io_finished (msg);
+gboolean
+soup_message_io_run_until_read (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ cancellable, error);
+}
+gboolean
+soup_message_io_run_until_finish (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_object_ref (msg);
+
+ if (!io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ cancellable, error))
+ return FALSE;
+
+ soup_message_io_finished (msg);
g_object_unref (msg);
- return FALSE;
+ return TRUE;
+}
+
+static void
+client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
+{
+ SoupMessage *msg = user_data;
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+}
+
+GInputStream *
+soup_message_io_get_response_istream (SoupMessage *msg,
+ GError **error)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ GInputStream *client_stream;
+
+ g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
+
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ g_set_error_literal (error, SOUP_HTTP_ERROR,
+ msg->status_code, msg->reason_phrase);
+ return NULL;
+ }
+
+ client_stream = soup_client_input_stream_new (io->body_istream, msg);
+ g_signal_connect (client_stream, "eof",
+ G_CALLBACK (client_stream_eof), msg);
+
+ return client_stream;
}
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, item->msg);
+ if (!item->new_api)
+ io_run (item->msg, NULL);
}
void
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, msg);
+ io_run (msg, NULL);
}
void
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
if (io->io_source)
return FALSE;
- io_run (NULL, msg);
+ io_run (msg, NULL);
return FALSE;
}
void soup_message_io_unpause (SoupMessage *msg);
gboolean soup_message_io_in_progress (SoupMessage *msg);
+gboolean soup_message_io_run_until_write (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+gboolean soup_message_io_run_until_read (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+gboolean soup_message_io_run_until_finish (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+
+typedef gboolean (*SoupMessageSourceFunc) (SoupMessage *, gpointer);
+GSource *soup_message_io_get_source (SoupMessage *msg,
+ GCancellable *cancellable,
+ SoupMessageSourceFunc callback,
+ gpointer user_data);
+
+GInputStream *soup_message_io_get_response_istream (SoupMessage *msg,
+ GError **error);
+
gboolean soup_message_disables_feature (SoupMessage *msg,
gpointer feature);
g_object_unref (item->proxy_addr);
if (item->proxy_uri)
soup_uri_free (item->proxy_uri);
+ if (item->result)
+ g_object_unref (item->result);
soup_message_queue_item_set_connection (item, NULL);
g_slice_free (SoupMessageQueueItem, item);
}
SoupAddress *proxy_addr;
SoupURI *proxy_uri;
SoupConnection *conn;
+ GSimpleAsyncResult *result;
guint paused : 1;
+ guint new_api : 1;
guint redirection_count : 31;
SoupMessageQueueItemState state;
#include "soup-request-http.h"
#include "soup-cache.h"
#include "soup-cache-private.h"
-#include "soup-http-input-stream.h"
#include "soup-message.h"
#include "soup-session.h"
+#include "soup-session-private.h"
#include "soup-uri.h"
G_DEFINE_TYPE (SoupRequestHTTP, soup_request_http, SOUP_TYPE_REQUEST)
char *content_type;
};
+static void content_sniffed (SoupMessage *msg,
+ const char *content_type,
+ GHashTable *params,
+ gpointer user_data);
+
static void
soup_request_http_init (SoupRequestHTTP *http)
{
return FALSE;
http->priv->msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri);
+ g_signal_connect (http->priv->msg, "content-sniffed",
+ G_CALLBACK (content_sniffed), http);
return TRUE;
}
{
SoupRequestHTTP *http = SOUP_REQUEST_HTTP (object);
- if (http->priv->msg)
+ if (http->priv->msg) {
+ g_signal_handlers_disconnect_by_func (http->priv->msg,
+ G_CALLBACK (content_sniffed),
+ http);
g_object_unref (http->priv->msg);
+ }
g_free (http->priv->content_type);
GCancellable *cancellable,
GError **error)
{
- GInputStream *httpstream;
SoupRequestHTTP *http = SOUP_REQUEST_HTTP (request);
- httpstream = soup_http_input_stream_new (soup_request_get_session (request), http->priv->msg);
- if (!soup_http_input_stream_send (SOUP_HTTP_INPUT_STREAM (httpstream),
- cancellable, error)) {
- g_object_unref (httpstream);
- return NULL;
- }
- http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (SOUP_HTTP_INPUT_STREAM (httpstream)));
- return httpstream;
+ return soup_session_send_request (soup_request_get_session (request),
+ http->priv->msg,
+ cancellable, error);
}
static void
http_input_stream_ready_cb (GObject *source, GAsyncResult *result, gpointer user_data)
{
- SoupHTTPInputStream *httpstream = SOUP_HTTP_INPUT_STREAM (source);
SendAsyncData *sadata = user_data;
GError *error = NULL;
+ GInputStream *stream;
- if (soup_http_input_stream_send_finish (httpstream, result, &error)) {
- sadata->http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (httpstream));
- g_simple_async_result_set_op_res_gpointer (sadata->simple, httpstream, g_object_unref);
+ stream = soup_session_send_request_finish (SOUP_SESSION (source), result, &error);
+ if (stream) {
+ g_simple_async_result_set_op_res_gpointer (sadata->simple, stream, g_object_unref);
} else {
g_simple_async_result_take_error (sadata->simple, error);
- g_object_unref (httpstream);
}
g_simple_async_result_complete (sadata->simple);
free_send_async_data (sadata);
/* The resource was modified, or else it mysteriously disappeared
* from our cache. Either way we need to reload it now.
*/
- stream = soup_http_input_stream_new (session, sadata->original);
- soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream), G_PRIORITY_DEFAULT,
- sadata->cancellable, http_input_stream_ready_cb, sadata);
+ soup_session_send_request_async (session, msg, sadata->cancellable,
+ http_input_stream_ready_cb, sadata);
}
static gboolean
}
}
- stream = soup_http_input_stream_new (session, http->priv->msg);
- soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream),
- G_PRIORITY_DEFAULT, cancellable,
- http_input_stream_ready_cb, sadata);
+ soup_session_send_request_async (session, http->priv->msg, cancellable,
+ http_input_stream_ready_cb, sadata);
}
static GInputStream *
return soup_message_headers_get_content_length (http->priv->msg->response_headers);
}
+static void
+content_sniffed (SoupMessage *msg,
+ const char *content_type,
+ GHashTable *params,
+ gpointer user_data)
+{
+ SoupRequestHTTP *http = user_data;
+ GString *sniffed_type;
+
+ sniffed_type = g_string_new (content_type);
+ if (params) {
+ GHashTableIter iter;
+ gpointer key, value;
+
+ g_hash_table_iter_init (&iter, params);
+ while (g_hash_table_iter_next (&iter, &key, &value)) {
+ g_string_append (sniffed_type, "; ");
+ soup_header_g_string_append_param (sniffed_type, key, value);
+ }
+ }
+ g_free (http->priv->content_type);
+ http->priv->content_type = g_string_free (sniffed_type, FALSE);
+}
+
static const char *
soup_request_http_get_content_type (SoupRequest *request)
{
static void run_queue (SoupSessionAsync *sa);
static void do_idle_run_queue (SoupSession *session);
+static void send_request_running (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_finished (SoupSession *session, SoupMessageQueueItem *item);
+
static void queue_message (SoupSession *session, SoupMessage *req,
SoupSessionCallback callback, gpointer user_data);
static guint send_message (SoupSession *session, SoupMessage *req);
{
SoupMessageQueueItem *item = user_data;
+ do_idle_run_queue (item->session);
+
if (item->state != SOUP_MESSAGE_RESTARTING)
item->state = SOUP_MESSAGE_FINISHING;
- do_idle_run_queue (item->session);
}
static void
case SOUP_MESSAGE_READY:
item->state = SOUP_MESSAGE_RUNNING;
soup_session_send_queue_item (session, item, message_completed);
+ if (item->new_api)
+ send_request_running (session, item);
break;
case SOUP_MESSAGE_RESTARTING:
item->state = SOUP_MESSAGE_STARTING;
soup_message_restarted (item->msg);
+ if (item->new_api)
+ send_request_restarted (session, item);
break;
case SOUP_MESSAGE_FINISHING:
soup_session_unqueue_item (session, item);
if (item->callback)
item->callback (session, item->msg, item->callback_data);
+ else if (item->new_api)
+ send_request_finished (session, item);
g_object_unref (item->msg);
do_idle_run_queue (session);
g_object_unref (session);
{
do_idle_run_queue (session);
}
+
+
+static void
+send_request_return_result (SoupMessageQueueItem *item,
+ gpointer stream, GError *error)
+{
+ GSimpleAsyncResult *simple;
+
+ simple = item->result;
+ item->result = NULL;
+
+ if (error)
+ g_simple_async_result_take_error (simple, error);
+ else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ if (stream)
+ g_object_unref (stream);
+ g_simple_async_result_set_error (simple,
+ SOUP_HTTP_ERROR,
+ item->msg->status_code,
+ "%s",
+ item->msg->reason_phrase);
+ } else
+ g_simple_async_result_set_op_res_gpointer (simple, stream, g_object_unref);
+
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+}
+
+static void
+send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
+{
+ /* We won't be needing this, then. */
+ g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
+}
+
+static void
+send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
+{
+ GMemoryOutputStream *mostream;
+ GInputStream *istream = NULL;
+
+ if (!item->result) {
+ /* Something else already took care of it. */
+ return;
+ }
+
+ mostream = g_object_get_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream");
+ if (mostream && !SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ gpointer data;
+ gssize size;
+
+ /* We thought it would be requeued, but it wasn't, so
+ * return the original body.
+ */
+ size = g_memory_output_stream_get_data_size (mostream);
+ data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+ istream = g_memory_input_stream_new_from_data (data, size, g_free);
+ }
+
+ send_request_return_result (item, istream, NULL);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GInputStream *istream = g_object_get_data (source, "istream");
+
+ GError *error = NULL;
+
+ /* If the message was cancelled, it will be completed via other means */
+ if (g_cancellable_is_cancelled (item->cancellable) ||
+ !item->result) {
+ soup_message_queue_item_unref (item);
+ return;
+ }
+
+ if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+ result, &error) == -1) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ /* Otherwise either restarted or finished will eventually be called.
+ * It should be safe to call the sync close() method here since
+ * the message body has already been written.
+ */
+ g_input_stream_close (istream, NULL, NULL);
+ do_idle_run_queue (item->session);
+ soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+ GInputStream *stream)
+{
+ if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+ item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+ soup_session_would_redirect (item->session, item->msg)) {
+ GOutputStream *ostream;
+
+ /* Message may be requeued, so gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ g_object_set_data_full (G_OBJECT (item->msg), "SoupSessionAsync:ostream",
+ ostream, g_object_unref);
+
+ g_object_set_data_full (G_OBJECT (ostream), "istream",
+ stream, g_object_unref);
+
+ /* Give the splice op its own ref on item */
+ soup_message_queue_item_ref (item);
+ g_output_stream_splice_async (ostream, stream,
+ /* We can't use CLOSE_SOURCE because it
+ * might get closed in the wrong thread.
+ */
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT,
+ item->cancellable,
+ send_async_spliced, item);
+ return;
+ }
+
+ send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GError *error = NULL;
+
+ if (g_cancellable_set_error_if_cancelled (item->cancellable, &error)) {
+ send_request_return_result (item, NULL, error);
+ return FALSE;
+ }
+
+ try_run_until_read (item);
+ return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+ GError *error = NULL;
+ GInputStream *stream = NULL;
+ GSource *source;
+
+ if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+ stream = soup_message_io_get_response_istream (item->msg, &error);
+ if (stream) {
+ send_async_maybe_complete (item, stream);
+ return;
+ }
+
+ if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ g_clear_error (&error);
+ source = soup_message_io_get_source (item->msg, item->cancellable,
+ read_ready_cb, item);
+ g_source_attach (source, soup_session_get_async_context (item->session));
+ g_source_unref (source);
+}
+
+static void
+send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+ try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupMessageQueueItem *item;
+ gboolean use_thread_context;
+
+ g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
+
+ g_object_get (G_OBJECT (session),
+ SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+ NULL);
+ g_return_if_fail (use_thread_context);
+
+ /* Balance out the unref that queuing will eventually do */
+ g_object_ref (msg);
+
+ queue_message (session, msg, NULL, NULL);
+
+ item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+ g_return_if_fail (item != NULL);
+
+ item->new_api = TRUE;
+ item->result = g_simple_async_result_new (G_OBJECT (session),
+ callback, user_data,
+ soup_session_send_request_async);
+ g_simple_async_result_set_op_res_gpointer (item->result, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+ if (cancellable) {
+ g_object_unref (item->cancellable);
+ item->cancellable = g_object_ref (cancellable);
+ }
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
+ g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (session), soup_session_send_request_async), NULL);
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ if (g_simple_async_result_propagate_error (simple, error))
+ return NULL;
+ return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));
+}
SoupMessageQueueItem *item,
guint status_code);
+GInputStream *soup_session_send_request (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+
+void soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+GInputStream *soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error);
+
G_END_DECLS
#endif /* SOUP_SESSION_PRIVATE_H */
item->state = SOUP_MESSAGE_READY;
}
+static void process_queue_item (SoupMessageQueueItem *item);
+
+static void
+new_api_message_completed (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ if (item->state != SOUP_MESSAGE_RESTARTING) {
+ item->state = SOUP_MESSAGE_FINISHING;
+ process_queue_item (item);
+ }
+}
+
static void
process_queue_item (SoupMessageQueueItem *item)
{
SoupProxyURIResolver *proxy_resolver;
guint status;
- item->state = SOUP_MESSAGE_STARTING;
+ soup_message_queue_item_ref (item);
+
do {
if (item->paused) {
g_mutex_lock (&priv->lock);
case SOUP_MESSAGE_READY:
item->state = SOUP_MESSAGE_RUNNING;
+
+ if (item->new_api) {
+ soup_session_send_queue_item (item->session, item, new_api_message_completed);
+ goto out;
+ }
+
soup_session_send_queue_item (item->session, item, NULL);
if (item->state != SOUP_MESSAGE_RESTARTING)
item->state = SOUP_MESSAGE_FINISHING;
break;
+ case SOUP_MESSAGE_RUNNING:
+ g_warn_if_fail (item->new_api);
+ item->state = SOUP_MESSAGE_FINISHING;
+ break;
+
case SOUP_MESSAGE_RESTARTING:
item->state = SOUP_MESSAGE_STARTING;
soup_message_restarted (item->msg);
break;
}
} while (item->state != SOUP_MESSAGE_FINISHED);
+
+ out:
+ soup_message_queue_item_unref (item);
}
static gboolean
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->lock);
}
+
+
+GInputStream *
+soup_session_send_request (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupMessageQueueItem *item;
+ GInputStream *stream = NULL;
+ GOutputStream *ostream;
+ GMemoryOutputStream *mostream;
+ gssize size;
+
+ g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
+
+ SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
+
+ item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+ g_return_val_if_fail (item != NULL, NULL);
+
+ item->new_api = TRUE;
+
+ while (!stream) {
+ /* Get a connection, etc */
+ process_queue_item (item);
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
+ break;
+
+ /* Send request, read headers */
+ if (!soup_message_io_run_until_read (msg, cancellable, error))
+ break;
+
+ stream = soup_message_io_get_response_istream (msg, error);
+ if (!stream)
+ break;
+
+ /* Break if the message doesn't look likely-to-be-requeued */
+ if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
+ msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
+ !soup_session_would_redirect (session, msg))
+ break;
+
+ /* Gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ if (g_output_stream_splice (ostream, stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ cancellable, error) == -1) {
+ g_object_unref (stream);
+ g_object_unref (ostream);
+ stream = NULL;
+ break;
+ }
+ g_object_unref (stream);
+ stream = NULL;
+
+ /* If the message was requeued, loop */
+ if (item->state == SOUP_MESSAGE_RESTARTING) {
+ g_object_unref (ostream);
+ continue;
+ }
+
+ /* Not requeued, so return the original body */
+ mostream = G_MEMORY_OUTPUT_STREAM (ostream);
+ size = g_memory_output_stream_get_data_size (mostream);
+ stream = g_memory_input_stream_new ();
+ if (size) {
+ g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
+ g_memory_output_stream_steal_data (mostream),
+ size, g_free);
+ }
+ g_object_unref (ostream);
+ }
+
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ if (stream) {
+ g_object_unref (stream);
+ stream = NULL;
+ }
+ g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
+ msg->reason_phrase);
+ }
+
+ soup_message_queue_item_unref (item);
+ return stream;
+}
if (new_uri)
soup_uri_free (new_uri);
- if (invalid) {
- /* Really we should just leave the status as-is,
- * but that would be an API break.
- */
+ if (invalid && !item->new_api) {
soup_message_set_status_full (msg,
SOUP_STATUS_MALFORMED,
"Invalid Redirect URL");
priv = SOUP_SESSION_GET_PRIVATE (session);
item = soup_message_queue_lookup (priv->queue, msg);
g_return_if_fail (item != NULL);
+ g_return_if_fail (!item->new_api);
item->paused = TRUE;
if (item->state == SOUP_MESSAGE_RUNNING)
priv = SOUP_SESSION_GET_PRIVATE (session);
item = soup_message_queue_lookup (priv->queue, msg);
g_return_if_fail (item != NULL);
+ g_return_if_fail (!item->new_api);
item->paused = FALSE;
if (item->state == SOUP_MESSAGE_RUNNING)
libsoup/soup-body-input-stream.c
libsoup/soup-converter-wrapper.c
+libsoup/soup-message-io.c
libsoup/soup-request.c
libsoup/soup-requester.c