From c0414594616131e082e87b78b41542be6785158a Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Wed, 8 Dec 2010 15:56:37 +0100 Subject: [PATCH] soup-message-io: use gio streams rather than SoupSocket Use the socket's input/output streams for the base I/O, and add new SoupBodyInputStream and SoupBodyOutputStream that can be created from them to handle the body of a single message (including handling chunked encoding/decoding). Update chunk-test, which was assuming that the chunk_allocator callback would never be called if the message had a 0-length body; that's no longer true. --- libsoup/Makefile.am | 4 + libsoup/soup-body-input-stream.c | 362 +++++++++++++++ libsoup/soup-body-input-stream.h | 48 ++ libsoup/soup-body-output-stream.c | 322 +++++++++++++ libsoup/soup-body-output-stream.h | 47 ++ libsoup/soup-message-io.c | 920 +++++++++++++++----------------------- libsoup/soup-socket.c | 18 +- libsoup/soup-socket.h | 2 + po/POTFILES.in | 1 + tests/chunk-test.c | 10 - tests/connection-test.c | 17 +- 11 files changed, 1182 insertions(+), 569 deletions(-) create mode 100644 libsoup/soup-body-input-stream.c create mode 100644 libsoup/soup-body-input-stream.h create mode 100644 libsoup/soup-body-output-stream.c create mode 100644 libsoup/soup-body-output-stream.h diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am index 4526ca5..5cfba04 100644 --- a/libsoup/Makefile.am +++ b/libsoup/Makefile.am @@ -95,6 +95,10 @@ libsoup_2_4_la_SOURCES = \ soup-auth-manager.c \ soup-auth-manager-ntlm.h \ soup-auth-manager-ntlm.c \ + soup-body-input-stream.h \ + soup-body-input-stream.c \ + soup-body-output-stream.h \ + soup-body-output-stream.c \ soup-cache.c \ soup-cache-private.h \ soup-connection.h \ diff --git a/libsoup/soup-body-input-stream.c b/libsoup/soup-body-input-stream.c new file mode 100644 index 0000000..2c5d16e --- /dev/null +++ b/libsoup/soup-body-input-stream.c @@ -0,0 +1,362 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-body-input-stream.c + * + * Copyright 2012 Red Hat, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include + +#include + +#include "soup-body-input-stream.h" +#include "soup-enum-types.h" +#include "soup-filter-input-stream.h" +#include "soup-message-headers.h" + +typedef enum { + SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE, + SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END, + SOUP_BODY_INPUT_STREAM_STATE_CHUNK, + SOUP_BODY_INPUT_STREAM_STATE_TRAILERS, + SOUP_BODY_INPUT_STREAM_STATE_DONE +} SoupBodyInputStreamState; + +struct _SoupBodyInputStreamPrivate { + GInputStream *base_stream; + + SoupEncoding encoding; + goffset read_length; + SoupBodyInputStreamState chunked_state; + gboolean eof; +}; + +enum { + PROP_0, + + PROP_ENCODING, + PROP_CONTENT_LENGTH +}; + +static void soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SoupBodyInputStream, soup_body_input_stream, G_TYPE_FILTER_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + soup_body_input_stream_pollable_init)) + +static void +soup_body_input_stream_init (SoupBodyInputStream *bistream) +{ + bistream->priv = G_TYPE_INSTANCE_GET_PRIVATE (bistream, + SOUP_TYPE_BODY_INPUT_STREAM, + SoupBodyInputStreamPrivate); + bistream->priv->encoding = SOUP_ENCODING_NONE; +} + +static void +constructed (GObject *object) +{ + SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object); + + bistream->priv->base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (bistream)); + + if (bistream->priv->encoding == SOUP_ENCODING_NONE || + (bistream->priv->encoding == SOUP_ENCODING_CONTENT_LENGTH && + bistream->priv->read_length == 0)) + bistream->priv->eof = TRUE; +} + +static void +set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) +{ + SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object); + + switch (prop_id) { + case PROP_ENCODING: + bistream->priv->encoding = g_value_get_enum (value); + if (bistream->priv->encoding == SOUP_ENCODING_CHUNKED) + bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE; + break; + case PROP_CONTENT_LENGTH: + bistream->priv->read_length = g_value_get_int64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) +{ + SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object); + + switch (prop_id) { + case PROP_ENCODING: + g_value_set_enum (value, bistream->priv->encoding); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gssize +soup_body_input_stream_read_raw (SoupBodyInputStream *bistream, + void *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + gssize nread; + + nread = g_pollable_stream_read (bistream->priv->base_stream, + buffer, count, + blocking, + cancellable, error); + if (nread == 0) { + bistream->priv->eof = TRUE; + if (bistream->priv->encoding != SOUP_ENCODING_EOF) { + g_set_error_literal (error, G_IO_ERROR, + G_IO_ERROR_PARTIAL_INPUT, + _("Connection terminated unexpectedly")); + return -1; + } + } + return nread; +} + +static gssize +soup_body_input_stream_read_chunked (SoupBodyInputStream *bistream, + void *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream); + char metabuf[128]; + gssize nread; + gboolean got_line; + +again: + switch (bistream->priv->chunked_state) { + case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE: + nread = soup_filter_input_stream_read_line ( + fstream, metabuf, sizeof (metabuf), blocking, + &got_line, cancellable, error); + if (nread <= 0) + return nread; + if (!got_line) { + g_set_error_literal (error, G_IO_ERROR, + G_IO_ERROR_PARTIAL_INPUT, + _("Connection terminated unexpectedly")); + return -1; + } + + bistream->priv->read_length = strtoul (metabuf, NULL, 16); + if (bistream->priv->read_length > 0) + bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK; + else + bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_TRAILERS; + break; + + case SOUP_BODY_INPUT_STREAM_STATE_CHUNK: + nread = soup_body_input_stream_read_raw ( + bistream, buffer, + MIN (count, bistream->priv->read_length), + blocking, cancellable, error); + if (nread > 0) { + bistream->priv->read_length -= nread; + if (bistream->priv->read_length == 0) + bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END; + } + return nread; + + case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END: + nread = soup_filter_input_stream_read_line ( + SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream), + metabuf, sizeof (metabuf), blocking, + &got_line, cancellable, error); + if (nread <= 0) + return nread; + if (!got_line) { + g_set_error_literal (error, G_IO_ERROR, + G_IO_ERROR_PARTIAL_INPUT, + _("Connection terminated unexpectedly")); + return -1; + } + + bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE; + break; + + case SOUP_BODY_INPUT_STREAM_STATE_TRAILERS: + nread = soup_filter_input_stream_read_line ( + fstream, buffer, count, blocking, + &got_line, cancellable, error); + if (nread <= 0) + return nread; + + if (strncmp (buffer, "\r\n", nread) || strncmp (buffer, "\n", nread)) + bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_DONE; + break; + + case SOUP_BODY_INPUT_STREAM_STATE_DONE: + return 0; + } + + goto again; +} + +static gssize +read_internal (GInputStream *stream, + void *buffer, + gsize count, + gboolean blocking, + GCancellable *cancellable, + GError **error) +{ + SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream); + gssize nread; + + if (bistream->priv->eof) + return 0; + + switch (bistream->priv->encoding) { + case SOUP_ENCODING_NONE: + return 0; + + case SOUP_ENCODING_CHUNKED: + return soup_body_input_stream_read_chunked (bistream, buffer, count, + blocking, cancellable, error); + + case SOUP_ENCODING_CONTENT_LENGTH: + case SOUP_ENCODING_EOF: + if (bistream->priv->read_length != -1) { + count = MIN (count, bistream->priv->read_length); + if (count == 0) + return 0; + } + + nread = soup_body_input_stream_read_raw (bistream, buffer, count, + blocking, cancellable, error); + if (bistream->priv->read_length != -1 && nread > 0) + bistream->priv->read_length -= nread; + return nread; + + default: + g_return_val_if_reached (-1); + } +} + +static gssize +soup_body_input_stream_read_fn (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + return read_internal (stream, buffer, count, TRUE, + cancellable, error); +} + +static gboolean +soup_body_input_stream_is_readable (GPollableInputStream *stream) +{ + SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream); + + return bistream->priv->eof || + g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream)); +} + +static gssize +soup_body_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE, + NULL, error); +} + +static GSource * +soup_body_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream); + GSource *base_source, *pollable_source; + + if (bistream->priv->eof) + base_source = g_timeout_source_new (0); + else + base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream), cancellable); + g_source_set_dummy_callback (base_source); + + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + g_source_add_child_source (pollable_source, base_source); + g_source_unref (base_source); + + return pollable_source; +} + +static void +soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class) +{ + GObjectClass *object_class = G_OBJECT_CLASS (stream_class); + GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class); + + g_type_class_add_private (stream_class, sizeof (SoupBodyInputStreamPrivate)); + + object_class->constructed = constructed; + object_class->set_property = set_property; + object_class->get_property = get_property; + + input_stream_class->read_fn = soup_body_input_stream_read_fn; + + g_object_class_install_property ( + object_class, PROP_ENCODING, + g_param_spec_enum ("encoding", + "Encoding", + "Message body encoding", + SOUP_TYPE_ENCODING, + SOUP_ENCODING_NONE, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property ( + object_class, PROP_CONTENT_LENGTH, + g_param_spec_int64 ("content-length", + "Content-Length", + "Message body Content-Length", + -1, G_MAXINT64, -1, + G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); +} + +static void +soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_readable = soup_body_input_stream_is_readable; + pollable_interface->read_nonblocking = soup_body_input_stream_read_nonblocking; + pollable_interface->create_source = soup_body_input_stream_create_source; +} + +GInputStream * +soup_body_input_stream_new (SoupFilterInputStream *base_stream, + SoupEncoding encoding, + goffset content_length) +{ + return g_object_new (SOUP_TYPE_BODY_INPUT_STREAM, + "base-stream", base_stream, + "close-base-stream", FALSE, + "encoding", encoding, + "content-length", content_length, + NULL); +} diff --git a/libsoup/soup-body-input-stream.h b/libsoup/soup-body-input-stream.h new file mode 100644 index 0000000..9e0c08e --- /dev/null +++ b/libsoup/soup-body-input-stream.h @@ -0,0 +1,48 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright 2012 Red Hat, Inc. + */ + +#ifndef SOUP_BODY_INPUT_STREAM_H +#define SOUP_BODY_INPUT_STREAM_H 1 + +#include "soup-types.h" +#include "soup-filter-input-stream.h" +#include "soup-message-headers.h" + +G_BEGIN_DECLS + +#define SOUP_TYPE_BODY_INPUT_STREAM (soup_body_input_stream_get_type ()) +#define SOUP_BODY_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStream)) +#define SOUP_BODY_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStreamClass)) +#define SOUP_IS_BODY_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_BODY_INPUT_STREAM)) +#define SOUP_IS_BODY_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_BODY_INPUT_STREAM)) +#define SOUP_BODY_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStreamClass)) + +typedef struct _SoupBodyInputStreamPrivate SoupBodyInputStreamPrivate; + +typedef struct { + GFilterInputStream parent; + + SoupBodyInputStreamPrivate *priv; +} SoupBodyInputStream; + +typedef struct { + GFilterInputStreamClass parent_class; + + /* Padding for future expansion */ + void (*_libsoup_reserved1) (void); + void (*_libsoup_reserved2) (void); + void (*_libsoup_reserved3) (void); + void (*_libsoup_reserved4) (void); +} SoupBodyInputStreamClass; + +GType soup_body_input_stream_get_type (void); + +GInputStream *soup_body_input_stream_new (SoupFilterInputStream *base_stream, + SoupEncoding encoding, + goffset content_length); + +G_END_DECLS + +#endif /* SOUP_BODY_INPUT_STREAM_H */ diff --git a/libsoup/soup-body-output-stream.c b/libsoup/soup-body-output-stream.c new file mode 100644 index 0000000..269ec71 --- /dev/null +++ b/libsoup/soup-body-output-stream.c @@ -0,0 +1,322 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-body-output-stream.c + * + * Copyright 2012 Red Hat, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include + +#include "soup-body-output-stream.h" +#include "soup-enum-types.h" +#include "soup-message-headers.h" + +typedef enum { + SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE, + SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END, + SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK, + SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS, + SOUP_BODY_OUTPUT_STREAM_STATE_DONE +} SoupBodyOutputStreamState; + +struct _SoupBodyOutputStreamPrivate { + GOutputStream *base_stream; + char buf[20]; + + SoupEncoding encoding; + goffset write_length; + goffset written; + SoupBodyOutputStreamState chunked_state; + gboolean eof; +}; + +enum { + PROP_0, + + PROP_ENCODING, + PROP_CONTENT_LENGTH +}; + +static void soup_body_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SoupBodyOutputStream, soup_body_output_stream, G_TYPE_FILTER_OUTPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, + soup_body_output_stream_pollable_init)) + + +static void +soup_body_output_stream_init (SoupBodyOutputStream *stream) +{ + stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, + SOUP_TYPE_BODY_OUTPUT_STREAM, + SoupBodyOutputStreamPrivate); +} + +static void +constructed (GObject *object) +{ + SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object); + + bostream->priv->base_stream = g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (bostream)); +} + +static void +set_property (GObject *object, guint prop_id, + const GValue *value, GParamSpec *pspec) +{ + SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object); + + switch (prop_id) { + case PROP_ENCODING: + bostream->priv->encoding = g_value_get_enum (value); + if (bostream->priv->encoding == SOUP_ENCODING_CHUNKED) + bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE; + break; + case PROP_CONTENT_LENGTH: + bostream->priv->write_length = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +get_property (GObject *object, guint prop_id, + GValue *value, GParamSpec *pspec) +{ + SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object); + + switch (prop_id) { + case PROP_ENCODING: + g_value_set_enum (value, bostream->priv->encoding); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gssize +soup_body_output_stream_write_raw (SoupBodyOutputStream *bostream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + gssize nwrote, my_count; + + /* If the caller tries to write too much to a Content-Length + * encoded stream, we truncate at the right point, but keep + * accepting additional data until they stop. + */ + if (bostream->priv->write_length) { + my_count = MIN (count, bostream->priv->write_length - bostream->priv->written); + if (my_count == 0) { + bostream->priv->eof = TRUE; + return count; + } + } else + my_count = count; + + nwrote = g_output_stream_write (bostream->priv->base_stream, + buffer, my_count, + cancellable, error); + + if (nwrote > 0 && bostream->priv->write_length) + bostream->priv->written += nwrote; + + if (nwrote == my_count && my_count != count) + nwrote = count; + + return nwrote; +} + +static gssize +soup_body_output_stream_write_chunked (SoupBodyOutputStream *bostream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + char *buf = bostream->priv->buf; + gssize nwrote, len; + +again: + len = strlen (buf); + if (len) { + nwrote = g_output_stream_write (bostream->priv->base_stream, + buf, len, cancellable, error); + if (nwrote < 0) + return nwrote; + memmove (buf, buf + nwrote, len + 1 - nwrote); + goto again; + } + + switch (bostream->priv->chunked_state) { + case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE: + snprintf (buf, sizeof (bostream->priv->buf), + "%lx\r\n", (gulong)count); + len = strlen (buf); + + if (count > 0) + bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK; + else + bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS; + break; + + case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK: + nwrote = g_output_stream_write (bostream->priv->base_stream, + buffer, count, cancellable, error); + if (nwrote < (gssize)count) + return nwrote; + + bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END; + break; + + case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END: + strncpy (buf, "\r\n", sizeof (bostream->priv->buf)); + len = 2; + bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_DONE; + break; + + case SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS: + strncpy (buf, "\r\n", sizeof (bostream->priv->buf)); + len = 2; + bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_DONE; + break; + + case SOUP_BODY_OUTPUT_STREAM_STATE_DONE: + bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE; + return count; + } + + goto again; +} + +static gssize +soup_body_output_stream_write_fn (GOutputStream *stream, + const void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream); + + if (bostream->priv->eof) + return count; + + switch (bostream->priv->encoding) { + case SOUP_ENCODING_CHUNKED: + return soup_body_output_stream_write_chunked (bostream, buffer, count, + cancellable, error); + + default: + return soup_body_output_stream_write_raw (bostream, buffer, count, + cancellable, error); + } +} + +static gboolean +soup_body_output_stream_close_fn (GOutputStream *stream, + GCancellable *cancellable, + GError **error) +{ + SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream); + + if (bostream->priv->encoding == SOUP_ENCODING_CHUNKED) { + if (soup_body_output_stream_write_chunked (bostream, NULL, 0, cancellable, error) == -1) + return FALSE; + } + + return G_OUTPUT_STREAM_CLASS (soup_body_output_stream_parent_class)->close_fn (stream, cancellable, error); +} + +static gboolean +soup_body_output_stream_is_writable (GPollableOutputStream *stream) +{ + SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream); + + return bostream->priv->eof || + g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (bostream->priv->base_stream)); +} + +static GSource * +soup_body_output_stream_create_source (GPollableOutputStream *stream, + GCancellable *cancellable) +{ + SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream); + GSource *base_source, *pollable_source; + + if (bostream->priv->eof) + base_source = g_timeout_source_new (0); + else + base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (bostream->priv->base_stream), cancellable); + g_source_set_dummy_callback (base_source); + + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + g_source_add_child_source (pollable_source, base_source); + g_source_unref (base_source); + + return pollable_source; +} + +static void +soup_body_output_stream_class_init (SoupBodyOutputStreamClass *stream_class) +{ + GObjectClass *object_class = G_OBJECT_CLASS (stream_class); + GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (stream_class); + + g_type_class_add_private (stream_class, sizeof (SoupBodyOutputStreamPrivate)); + + object_class->constructed = constructed; + object_class->set_property = set_property; + object_class->get_property = get_property; + + output_stream_class->write_fn = soup_body_output_stream_write_fn; + output_stream_class->close_fn = soup_body_output_stream_close_fn; + + g_object_class_install_property ( + object_class, PROP_ENCODING, + g_param_spec_enum ("encoding", + "Encoding", + "Message body encoding", + SOUP_TYPE_ENCODING, + SOUP_ENCODING_NONE, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property ( + object_class, PROP_CONTENT_LENGTH, + g_param_spec_uint64 ("content-length", + "Content-Length", + "Message body Content-Length", + 0, G_MAXUINT64, 0, + G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); +} + +static void +soup_body_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_writable = soup_body_output_stream_is_writable; + pollable_interface->create_source = soup_body_output_stream_create_source; +} + +GOutputStream * +soup_body_output_stream_new (GOutputStream *base_stream, + SoupEncoding encoding, + goffset content_length) +{ + return g_object_new (SOUP_TYPE_BODY_OUTPUT_STREAM, + "base-stream", base_stream, + "close-base-stream", FALSE, + "encoding", encoding, + "content-length", content_length, + NULL); +} diff --git a/libsoup/soup-body-output-stream.h b/libsoup/soup-body-output-stream.h new file mode 100644 index 0000000..8bd8970 --- /dev/null +++ b/libsoup/soup-body-output-stream.h @@ -0,0 +1,47 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright 2012 Red Hat, Inc. + */ + +#ifndef SOUP_BODY_OUTPUT_STREAM_H +#define SOUP_BODY_OUTPUT_STREAM_H 1 + +#include "soup-types.h" +#include "soup-message-headers.h" + +G_BEGIN_DECLS + +#define SOUP_TYPE_BODY_OUTPUT_STREAM (soup_body_output_stream_get_type ()) +#define SOUP_BODY_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStream)) +#define SOUP_BODY_OUTPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStreamClass)) +#define SOUP_IS_BODY_OUTPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM)) +#define SOUP_IS_BODY_OUTPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM)) +#define SOUP_BODY_OUTPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStreamClass)) + +typedef struct _SoupBodyOutputStreamPrivate SoupBodyOutputStreamPrivate; + +typedef struct { + GFilterOutputStream parent; + + SoupBodyOutputStreamPrivate *priv; +} SoupBodyOutputStream; + +typedef struct { + GFilterOutputStreamClass parent_class; + + /* Padding for future expansion */ + void (*_libsoup_reserved1) (void); + void (*_libsoup_reserved2) (void); + void (*_libsoup_reserved3) (void); + void (*_libsoup_reserved4) (void); +} SoupBodyOutputStreamClass; + +GType soup_body_output_stream_get_type (void); + +GOutputStream *soup_body_output_stream_new (GOutputStream *base_stream, + SoupEncoding encoding, + goffset content_length); + +G_END_DECLS + +#endif /* SOUP_BODY_OUTPUT_STREAM_H */ diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c index cf2a2e3..aabb902 100644 --- a/libsoup/soup-message-io.c +++ b/libsoup/soup-message-io.c @@ -12,7 +12,10 @@ #include #include +#include "soup-body-input-stream.h" +#include "soup-body-output-stream.h" #include "soup-connection.h" +#include "soup-filter-input-stream.h" #include "soup-message.h" #include "soup-message-private.h" #include "soup-message-queue.h" @@ -28,11 +31,10 @@ typedef enum { SOUP_MESSAGE_IO_STATE_NOT_STARTED, SOUP_MESSAGE_IO_STATE_HEADERS, SOUP_MESSAGE_IO_STATE_BLOCKING, + SOUP_MESSAGE_IO_STATE_BODY_START, SOUP_MESSAGE_IO_STATE_BODY, - SOUP_MESSAGE_IO_STATE_CHUNK_SIZE, - SOUP_MESSAGE_IO_STATE_CHUNK, - SOUP_MESSAGE_IO_STATE_CHUNK_END, - SOUP_MESSAGE_IO_STATE_TRAILERS, + SOUP_MESSAGE_IO_STATE_BODY_DATA, + SOUP_MESSAGE_IO_STATE_BODY_DONE, SOUP_MESSAGE_IO_STATE_FINISHING, SOUP_MESSAGE_IO_STATE_DONE } SoupMessageIOState; @@ -43,17 +45,23 @@ typedef enum { state != SOUP_MESSAGE_IO_STATE_DONE) typedef struct { - SoupSocket *sock; SoupMessageQueueItem *item; SoupMessageIOMode mode; GCancellable *cancellable; + SoupSocket *sock; + SoupFilterInputStream *istream; + GInputStream *body_istream; + GOutputStream *ostream; + GOutputStream *body_ostream; + GMainContext *async_context; + gboolean blocking; + SoupMessageIOState read_state; SoupEncoding read_encoding; - GByteArray *read_meta_buf; + GByteArray *read_header_buf; SoupMessageBody *read_body; goffset read_length; - gboolean read_eof_ok; gboolean need_content_sniffed, need_got_chunk; SoupMessageBody *sniff_data; @@ -67,8 +75,9 @@ typedef struct { goffset write_length; goffset written; - guint read_tag, write_tag; + GSource *io_source; GSource *unpause_source; + gboolean paused; SoupMessageGetHeadersFn get_headers_cb; SoupMessageParseHeadersFn parse_headers_cb; @@ -83,8 +92,8 @@ typedef struct { */ #define dummy_to_make_emacs_happy { #define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg); -#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return; } -#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return val; } +#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; } +#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; } #define RESPONSE_BLOCK_SIZE 8192 @@ -103,10 +112,20 @@ soup_message_io_cleanup (SoupMessage *msg) if (io->sock) g_object_unref (io->sock); + if (io->istream) + g_object_remove_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream); + if (io->ostream) + g_object_remove_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream); + if (io->body_istream) + g_object_unref (io->body_istream); + if (io->body_ostream) + g_object_unref (io->body_ostream); + if (io->async_context) + g_main_context_unref (io->async_context); if (io->item) soup_message_queue_item_unref (io->item); - g_byte_array_free (io->read_meta_buf, TRUE); + g_byte_array_free (io->read_header_buf, TRUE); g_string_free (io->write_buf, TRUE); if (io->write_chunk) @@ -127,13 +146,9 @@ soup_message_io_stop (SoupMessage *msg) if (!io) return; - if (io->read_tag) { - g_signal_handler_disconnect (io->sock, io->read_tag); - io->read_tag = 0; - } - if (io->write_tag) { - g_signal_handler_disconnect (io->sock, io->write_tag); - io->write_tag = 0; + if (io->io_source) { + g_source_destroy (io->io_source); + io->io_source = NULL; } if (io->unpause_source) { @@ -145,9 +160,6 @@ soup_message_io_stop (SoupMessage *msg) soup_socket_disconnect (io->sock); } -#define SOUP_MESSAGE_IO_EOL "\r\n" -#define SOUP_MESSAGE_IO_EOL_LEN 2 - void soup_message_io_finished (SoupMessage *msg) { @@ -163,8 +175,6 @@ soup_message_io_finished (SoupMessage *msg) g_object_unref (msg); } -static void io_read (SoupSocket *sock, SoupMessage *msg); - static gboolean request_is_idempotent (SoupMessage *msg) { @@ -184,7 +194,7 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error) error->message); } else if (io->mode == SOUP_MESSAGE_IO_CLIENT && io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS && - io->read_meta_buf->len == 0 && + io->read_header_buf->len == 0 && soup_connection_get_ever_used (io->item->conn) && !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) && request_is_idempotent (msg)) { @@ -248,87 +258,54 @@ io_handle_sniffing (SoupMessage *msg, gboolean done_reading) return TRUE; } -/* Reads data from io->sock into io->read_meta_buf. If @to_blank is - * %TRUE, it reads up until a blank line ("CRLF CRLF" or "LF LF"). - * Otherwise, it reads up until a single CRLF or LF. - * - * This function is used to read metadata, and read_body_chunk() is - * used to read the message body contents. - * - * read_metadata, read_body_chunk, and write_data all use the same - * convention for return values: if they return %TRUE, it means - * they've completely finished the requested read/write, and the - * caller should move on to the next step. If they return %FALSE, it - * means that either (a) the socket returned SOUP_SOCKET_WOULD_BLOCK, - * so the caller should give up for now and wait for the socket to - * emit a signal, or (b) the socket returned an error, and io_error() - * was called to process it and cancel the I/O. So either way, if the - * function returns %FALSE, the caller should return immediately. - */ static gboolean -read_metadata (SoupMessage *msg, gboolean to_blank) +read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; - SoupSocketIOStatus status; - guchar read_buf[RESPONSE_BLOCK_SIZE]; - gsize nread; + gssize nread, old_len; gboolean got_lf; - GError *error = NULL; while (1) { - status = soup_socket_read_until (io->sock, read_buf, - sizeof (read_buf), - "\n", 1, &nread, &got_lf, - io->cancellable, &error); - switch (status) { - case SOUP_SOCKET_OK: - g_byte_array_append (io->read_meta_buf, read_buf, nread); - break; - - case SOUP_SOCKET_EOF: - /* More lame server handling... deal with - * servers that don't send the final chunk. - */ - if (io->read_state == SOUP_MESSAGE_IO_STATE_CHUNK_SIZE && - io->read_meta_buf->len == 0) { - g_byte_array_append (io->read_meta_buf, - (guchar *)"0\r\n", 3); - got_lf = TRUE; - break; - } else if (io->read_state == SOUP_MESSAGE_IO_STATE_TRAILERS && - io->read_meta_buf->len == 0) { - g_byte_array_append (io->read_meta_buf, - (guchar *)"\r\n", 2); - got_lf = TRUE; - break; - } - /* else fall through */ - - case SOUP_SOCKET_ERROR: - io_error (io->sock, msg, error); - return FALSE; - - case SOUP_SOCKET_WOULD_BLOCK: + old_len = io->read_header_buf->len; + g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE); + nread = soup_filter_input_stream_read_line (io->istream, + io->read_header_buf->data + old_len, + RESPONSE_BLOCK_SIZE, + io->blocking, + &got_lf, + cancellable, error); + io->read_header_buf->len = old_len + MAX (nread, 0); + if (nread == 0) + io_error (io->sock, msg, NULL); + if (nread <= 0) return FALSE; - } if (got_lf) { - if (!to_blank) - break; - if (nread == 1 && io->read_meta_buf->len >= 2 && - !strncmp ((char *)io->read_meta_buf->data + - io->read_meta_buf->len - 2, + if (nread == 1 && old_len >= 2 && + !strncmp ((char *)io->read_header_buf->data + + io->read_header_buf->len - 2, "\n\n", 2)) break; - else if (nread == 2 && io->read_meta_buf->len >= 3 && - !strncmp ((char *)io->read_meta_buf->data + - io->read_meta_buf->len - 3, + else if (nread == 2 && old_len >= 3 && + !strncmp ((char *)io->read_header_buf->data + + io->read_header_buf->len - 3, "\n\r\n", 3)) break; } } + /* We need to "rewind" io->read_header_buf back one line. + * That SHOULD be two characters (CR LF), but if the + * web server was stupid, it might only be one. + */ + if (io->read_header_buf->len < 3 || + io->read_header_buf->data[io->read_header_buf->len - 2] == '\n') + io->read_header_buf->len--; + else + io->read_header_buf->len -= 2; + io->read_header_buf->data[io->read_header_buf->len] = '\0'; + return TRUE; } @@ -445,165 +422,6 @@ content_decode (SoupMessage *msg, SoupBuffer *buf) return buf; } -/* Reads as much message body data as is available on io->sock (but no - * further than the end of the current message body or chunk). On a - * successful read, emits "got_chunk" (possibly multiple times), and - * (unless told not to) appends the chunk to io->read_body. - * - * See the note at read_metadata() for an explanation of the return - * value. - */ -static gboolean -read_body_chunk (SoupMessage *msg) -{ - SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); - SoupMessageIOData *io = priv->io_data; - SoupSocketIOStatus status; - guchar *stack_buf = NULL; - gsize len; - gboolean read_to_eof = (io->read_encoding == SOUP_ENCODING_EOF); - gsize nread; - GError *error = NULL; - SoupBuffer *buffer; - - if (!io_handle_sniffing (msg, FALSE)) - return FALSE; - - while (read_to_eof || io->read_length > 0) { - if (priv->chunk_allocator) { - buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data); - if (!buffer) { - soup_message_io_pause (msg); - return FALSE; - } - } else { - if (!stack_buf) - stack_buf = alloca (RESPONSE_BLOCK_SIZE); - buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY, - stack_buf, - RESPONSE_BLOCK_SIZE); - } - - if (read_to_eof) - len = buffer->length; - else - len = MIN (buffer->length, io->read_length); - - status = soup_socket_read (io->sock, - (guchar *)buffer->data, len, - &nread, io->cancellable, &error); - - if (status == SOUP_SOCKET_OK && nread) { - buffer->length = nread; - io->read_length -= nread; - - buffer = content_decode (msg, buffer); - if (!buffer) - continue; - - soup_message_body_got_chunk (io->read_body, buffer); - - if (io->need_content_sniffed) { - soup_message_body_append_buffer (io->sniff_data, buffer); - soup_buffer_free (buffer); - io->need_got_chunk = TRUE; - if (!io_handle_sniffing (msg, FALSE)) - return FALSE; - continue; - } - - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - soup_message_got_chunk (msg, buffer); - soup_buffer_free (buffer); - SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE); - continue; - } - - soup_buffer_free (buffer); - switch (status) { - case SOUP_SOCKET_OK: - break; - - case SOUP_SOCKET_EOF: - if (io->read_eof_ok) { - io->read_length = 0; - return TRUE; - } - /* else fall through */ - - case SOUP_SOCKET_ERROR: - io_error (io->sock, msg, error); - return FALSE; - - case SOUP_SOCKET_WOULD_BLOCK: - return FALSE; - } - } - - return TRUE; -} - -/* Attempts to write @len bytes from @data. See the note at - * read_metadata() for an explanation of the return value. - */ -static gboolean -write_data (SoupMessage *msg, const char *data, guint len, gboolean body) -{ - SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); - SoupMessageIOData *io = priv->io_data; - SoupSocketIOStatus status; - gsize nwrote; - GError *error = NULL; - SoupBuffer *chunk; - const char *start; - - while (len > io->written) { - status = soup_socket_write (io->sock, - data + io->written, - len - io->written, - &nwrote, - io->cancellable, &error); - switch (status) { - case SOUP_SOCKET_EOF: - case SOUP_SOCKET_ERROR: - io_error (io->sock, msg, error); - return FALSE; - - case SOUP_SOCKET_WOULD_BLOCK: - return FALSE; - - case SOUP_SOCKET_OK: - start = data + io->written; - io->written += nwrote; - - if (body) { - if (io->write_length) - io->write_length -= nwrote; - - chunk = soup_buffer_new (SOUP_MEMORY_TEMPORARY, - start, nwrote); - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - soup_message_wrote_body_data (msg, chunk); - soup_buffer_free (chunk); - SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE); - } - break; - } - } - - io->written = 0; - return TRUE; -} - -static inline SoupMessageIOState -io_body_state (SoupEncoding encoding) -{ - if (encoding == SOUP_ENCODING_CHUNKED) - return SOUP_MESSAGE_IO_STATE_CHUNK_SIZE; - else - return SOUP_MESSAGE_IO_STATE_BODY; -} - /* * There are two request/response formats: the basic request/response, * possibly with one or more unsolicited informational responses (such @@ -630,16 +448,24 @@ io_body_state (SoupEncoding encoding) * W:DONE / R:DONE R:DONE / W:DONE */ -static void -io_write (SoupSocket *sock, SoupMessage *msg) +/* Attempts to push forward the writing side of @msg's I/O. Returns + * %TRUE if it manages to make some progress, and it is likely that + * further progress can be made. Returns %FALSE if it has reached a + * stopping point of some sort (need input from the application, + * socket not writable, write is complete, etc). + */ +static gboolean +io_write (SoupMessage *msg, GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; + SoupBuffer *chunk; + gssize nwrote; - write_more: switch (io->write_state) { case SOUP_MESSAGE_IO_STATE_NOT_STARTED: - return; + case SOUP_MESSAGE_IO_STATE_BLOCKING: + return FALSE; case SOUP_MESSAGE_IO_STATE_HEADERS: @@ -647,32 +473,29 @@ io_write (SoupSocket *sock, SoupMessage *msg) io->get_headers_cb (msg, io->write_buf, &io->write_encoding, io->header_data); - if (!io->write_buf->len) { - soup_message_io_pause (msg); - return; - } } - if (!write_data (msg, io->write_buf->str, - io->write_buf->len, FALSE)) - return; + while (io->written < io->write_buf->len) { + nwrote = g_pollable_stream_write (io->ostream, + io->write_buf->str + io->written, + io->write_buf->len - io->written, + io->blocking, + cancellable, error); + if (nwrote == -1) + return FALSE; + io->written += nwrote; + } + io->written = 0; g_string_truncate (io->write_buf, 0); - if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) { - SoupMessageHeaders *hdrs = - (io->mode == SOUP_MESSAGE_IO_CLIENT) ? - msg->request_headers : msg->response_headers; - io->write_length = soup_message_headers_get_content_length (hdrs); - } - if (io->mode == SOUP_MESSAGE_IO_SERVER && SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) { if (msg->status_code == SOUP_STATUS_CONTINUE) { /* Stop and wait for the body now */ io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING; - io->read_state = io_body_state (io->read_encoding); + io->read_state = SOUP_MESSAGE_IO_STATE_BODY; } else { /* We just wrote a 1xx response * header, so stay in STATE_HEADERS. @@ -682,13 +505,26 @@ io_write (SoupSocket *sock, SoupMessage *msg) * response.) */ } - } else if (io->mode == SOUP_MESSAGE_IO_CLIENT && - soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) { + + soup_message_wrote_informational (msg); + soup_message_cleanup_response (msg); + break; + } + + if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) { + SoupMessageHeaders *hdrs = + (io->mode == SOUP_MESSAGE_IO_CLIENT) ? + msg->request_headers : msg->response_headers; + io->write_length = soup_message_headers_get_content_length (hdrs); + } + + if (io->mode == SOUP_MESSAGE_IO_CLIENT && + soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) { /* Need to wait for the Continue response */ io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING; io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; } else { - io->write_state = io_body_state (io->write_encoding); + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START; /* If the client was waiting for a Continue * but we sent something else, then they're @@ -696,39 +532,26 @@ io_write (SoupSocket *sock, SoupMessage *msg) */ if (io->mode == SOUP_MESSAGE_IO_SERVER && io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING) - io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; + io->read_state = SOUP_MESSAGE_IO_STATE_DONE; } - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - if (SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) { - soup_message_wrote_informational (msg); - soup_message_cleanup_response (msg); - } else - soup_message_wrote_headers (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; + soup_message_wrote_headers (msg); break; - case SOUP_MESSAGE_IO_STATE_BLOCKING: - io_read (sock, msg); - - /* If io_read reached a point where we could write - * again, it would have recursively called io_write. - * So (a) we don't need to try to keep writing, and - * (b) we can't anyway, because msg may have been - * destroyed. - */ - return; + case SOUP_MESSAGE_IO_STATE_BODY_START: + io->body_ostream = soup_body_output_stream_new (io->ostream, + io->write_encoding, + io->write_length); + io->write_state = SOUP_MESSAGE_IO_STATE_BODY; + break; case SOUP_MESSAGE_IO_STATE_BODY: - if (!io->write_length && io->write_encoding != SOUP_ENCODING_EOF) { - wrote_body: - io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING; - - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - soup_message_wrote_body (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; + if (!io->write_length && + io->write_encoding != SOUP_ENCODING_EOF && + io->write_encoding != SOUP_ENCODING_CHUNKED) { + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; break; } @@ -736,164 +559,114 @@ io_write (SoupSocket *sock, SoupMessage *msg) io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset); if (!io->write_chunk) { soup_message_io_pause (msg); - return; + return FALSE; + } + if (!io->write_chunk->length) { + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; + break; } - if (io->write_chunk->length > io->write_length && - io->write_encoding != SOUP_ENCODING_EOF) { - /* App is trying to write more than it - * claimed it would; we have to truncate. - */ - SoupBuffer *truncated = - soup_buffer_new_subbuffer (io->write_chunk, - 0, io->write_length); - soup_buffer_free (io->write_chunk); - io->write_chunk = truncated; - } else if (io->write_encoding == SOUP_ENCODING_EOF && - !io->write_chunk->length) - goto wrote_body; } - if (!write_data (msg, io->write_chunk->data, - io->write_chunk->length, TRUE)) - return; - - if (io->mode == SOUP_MESSAGE_IO_SERVER || - priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD) - soup_message_body_wrote_chunk (io->write_body, io->write_chunk); - io->write_body_offset += io->write_chunk->length; - soup_buffer_free (io->write_chunk); - io->write_chunk = NULL; + nwrote = g_pollable_stream_write (io->body_ostream, + io->write_chunk->data + io->written, + io->write_chunk->length - io->written, + io->blocking, + cancellable, error); + if (nwrote == -1) + return FALSE; - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - soup_message_wrote_chunk (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; - break; + chunk = soup_buffer_new_subbuffer (io->write_chunk, + io->written, nwrote); + io->written += nwrote; + if (io->write_length) + io->write_length -= nwrote; - case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE: - if (!io->write_chunk) { - io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset); - if (!io->write_chunk) { - soup_message_io_pause (msg); - return; - } - g_string_append_printf (io->write_buf, "%lx\r\n", - (unsigned long) io->write_chunk->length); - io->write_body_offset += io->write_chunk->length; - } + if (io->written == io->write_chunk->length) + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA; - if (!write_data (msg, io->write_buf->str, - io->write_buf->len, FALSE)) - return; + soup_message_wrote_body_data (msg, chunk); + soup_buffer_free (chunk); + break; - g_string_truncate (io->write_buf, 0); + case SOUP_MESSAGE_IO_STATE_BODY_DATA: + io->written = 0; if (io->write_chunk->length == 0) { - /* The last chunk has no CHUNK_END... */ - io->write_state = SOUP_MESSAGE_IO_STATE_TRAILERS; + io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; break; } - io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK; - /* fall through */ - - - case SOUP_MESSAGE_IO_STATE_CHUNK: - if (!write_data (msg, io->write_chunk->data, - io->write_chunk->length, TRUE)) - return; - if (io->mode == SOUP_MESSAGE_IO_SERVER || priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD) soup_message_body_wrote_chunk (io->write_body, io->write_chunk); + io->write_body_offset += io->write_chunk->length; soup_buffer_free (io->write_chunk); io->write_chunk = NULL; - io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_END; - - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; + io->write_state = SOUP_MESSAGE_IO_STATE_BODY; soup_message_wrote_chunk (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; - - /* fall through */ - - - case SOUP_MESSAGE_IO_STATE_CHUNK_END: - if (!write_data (msg, SOUP_MESSAGE_IO_EOL, - SOUP_MESSAGE_IO_EOL_LEN, FALSE)) - return; - - io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE; break; - case SOUP_MESSAGE_IO_STATE_TRAILERS: - if (!write_data (msg, SOUP_MESSAGE_IO_EOL, - SOUP_MESSAGE_IO_EOL_LEN, FALSE)) - return; + case SOUP_MESSAGE_IO_STATE_BODY_DONE: + if (io->body_ostream) { + if (!g_output_stream_close (io->body_ostream, cancellable, error)) + return FALSE; + g_clear_object (&io->body_ostream); + } io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING; - - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; soup_message_wrote_body (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; - /* fall through */ + break; case SOUP_MESSAGE_IO_STATE_FINISHING: - if (io->write_tag) { - g_signal_handler_disconnect (io->sock, io->write_tag); - io->write_tag = 0; - } io->write_state = SOUP_MESSAGE_IO_STATE_DONE; - if (io->mode == SOUP_MESSAGE_IO_CLIENT) { + if (io->mode == SOUP_MESSAGE_IO_CLIENT) io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; - io_read (sock, msg); - } else - soup_message_io_finished (msg); - return; + break; case SOUP_MESSAGE_IO_STATE_DONE: default: - g_return_if_reached (); + g_return_val_if_reached (FALSE); } - goto write_more; + return TRUE; } -static void -io_read (SoupSocket *sock, SoupMessage *msg) +/* Attempts to push forward the reading side of @msg's I/O. Returns + * %TRUE if it manages to make some progress, and it is likely that + * further progress can be made. Returns %FALSE if it has reached a + * stopping point of some sort (need input from the application, + * socket not readable, read is complete, etc). + */ +static gboolean +io_read (SoupMessage *msg, GCancellable *cancellable, GError **error) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; + guchar *stack_buf = NULL; + gssize nread; + SoupBuffer *buffer; guint status; - read_more: switch (io->read_state) { case SOUP_MESSAGE_IO_STATE_NOT_STARTED: - return; + case SOUP_MESSAGE_IO_STATE_BLOCKING: + return FALSE; case SOUP_MESSAGE_IO_STATE_HEADERS: - if (!read_metadata (msg, TRUE)) - return; - - /* We need to "rewind" io->read_meta_buf back one line. - * That SHOULD be two characters (CR LF), but if the - * web server was stupid, it might only be one. - */ - if (io->read_meta_buf->len < 3 || - io->read_meta_buf->data[io->read_meta_buf->len - 2] == '\n') - io->read_meta_buf->len--; - else - io->read_meta_buf->len -= 2; - io->read_meta_buf->data[io->read_meta_buf->len] = '\0'; - status = io->parse_headers_cb (msg, (char *)io->read_meta_buf->data, - io->read_meta_buf->len, + if (!read_headers (msg, cancellable, error)) + return FALSE; + + status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data, + io->read_header_buf->len, &io->read_encoding, io->header_data); - g_byte_array_set_size (io->read_meta_buf, 0); + g_byte_array_set_size (io->read_header_buf, 0); if (status != SOUP_STATUS_OK) { /* Either we couldn't parse the headers, or they @@ -910,26 +683,6 @@ io_read (SoupSocket *sock, SoupMessage *msg) break; } - if (io->read_encoding == SOUP_ENCODING_EOF) - io->read_eof_ok = TRUE; - - if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) { - SoupMessageHeaders *hdrs = - (io->mode == SOUP_MESSAGE_IO_CLIENT) ? - msg->response_headers : msg->request_headers; - io->read_length = soup_message_headers_get_content_length (hdrs); - - if (io->mode == SOUP_MESSAGE_IO_CLIENT && - !soup_message_is_keepalive (msg)) { - /* Some servers suck and send - * incorrect Content-Length values, so - * allow EOF termination in this case - * (iff the message is too short) too. - */ - io->read_eof_ok = TRUE; - } - } - if (io->mode == SOUP_MESSAGE_IO_CLIENT && SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) { if (msg->status_code == SOUP_STATUS_CONTINUE && @@ -938,11 +691,18 @@ io_read (SoupSocket *sock, SoupMessage *msg) io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING; io->write_state = - io_body_state (io->write_encoding); + SOUP_MESSAGE_IO_STATE_BODY_START; } else { /* Just stay in HEADERS */ io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; } + + /* Informational responses have no bodies, so + * bail out here rather than parsing encoding, etc + */ + soup_message_got_informational (msg); + soup_message_cleanup_response (msg); + break; } else if (io->mode == SOUP_MESSAGE_IO_SERVER && soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) { /* The client requested a Continue response. The @@ -953,7 +713,7 @@ io_read (SoupSocket *sock, SoupMessage *msg) io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS; io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING; } else { - io->read_state = io_body_state (io->read_encoding); + io->read_state = SOUP_MESSAGE_IO_STATE_BODY; /* If the client was waiting for a Continue * but got something else, then it's done @@ -964,121 +724,187 @@ io_read (SoupSocket *sock, SoupMessage *msg) io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING; } - if (io->mode == SOUP_MESSAGE_IO_CLIENT && - SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) { - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - soup_message_got_informational (msg); - soup_message_cleanup_response (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; - } else { - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - soup_message_got_headers (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; - } - break; - + if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) { + SoupMessageHeaders *hdrs = + (io->mode == SOUP_MESSAGE_IO_CLIENT) ? + msg->response_headers : msg->request_headers; + io->read_length = soup_message_headers_get_content_length (hdrs); - case SOUP_MESSAGE_IO_STATE_BLOCKING: - io_write (sock, msg); + if (io->mode == SOUP_MESSAGE_IO_CLIENT && + !soup_message_is_keepalive (msg)) { + /* Some servers suck and send + * incorrect Content-Length values, so + * allow EOF termination in this case + * (iff the message is too short) too. + */ + io->read_encoding = SOUP_ENCODING_EOF; + } + } else + io->read_length = -1; - /* As in the io_write case, we *must* return here. */ - return; + io->body_istream = soup_body_input_stream_new (SOUP_FILTER_INPUT_STREAM (io->istream), + io->read_encoding, + io->read_length); + soup_message_got_headers (msg); + break; case SOUP_MESSAGE_IO_STATE_BODY: - if (!read_body_chunk (msg)) - return; - - got_body: - if (!io_handle_sniffing (msg, TRUE)) { - /* If the message was paused (as opposed to - * cancelled), we need to make sure we wind up - * back here when it's unpaused, even if it - * was doing a chunked or EOF-terminated read - * before. - */ - if (io == priv->io_data) { - io->read_state = SOUP_MESSAGE_IO_STATE_BODY; - io->read_encoding = SOUP_ENCODING_CONTENT_LENGTH; - io->read_length = 0; + if (!io_handle_sniffing (msg, FALSE)) + return FALSE; + + if (priv->chunk_allocator) { + buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data); + if (!buffer) { + soup_message_io_pause (msg); + return FALSE; } - return; + } else { + if (!stack_buf) + stack_buf = alloca (RESPONSE_BLOCK_SIZE); + buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY, + stack_buf, + RESPONSE_BLOCK_SIZE); } - io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; + nread = g_pollable_stream_read (io->body_istream, + (guchar *)buffer->data, + buffer->length, + io->blocking, + cancellable, error); + if (nread > 0) { + buffer->length = nread; + buffer = content_decode (msg, buffer); + if (!buffer) + break; - SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK; - soup_message_got_body (msg); - SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED; - break; + soup_message_body_got_chunk (io->read_body, buffer); + if (io->need_content_sniffed) { + soup_message_body_append_buffer (io->sniff_data, buffer); + soup_buffer_free (buffer); + io->need_got_chunk = TRUE; + if (!io_handle_sniffing (msg, FALSE)) + return FALSE; + break; + } - case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE: - if (!read_metadata (msg, FALSE)) - return; + soup_message_got_chunk (msg, buffer); + soup_buffer_free (buffer); + break; + } - io->read_length = strtoul ((char *)io->read_meta_buf->data, NULL, 16); - g_byte_array_set_size (io->read_meta_buf, 0); + soup_buffer_free (buffer); + if (nread == -1) + return FALSE; - if (io->read_length > 0) - io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK; - else - io->read_state = SOUP_MESSAGE_IO_STATE_TRAILERS; + /* else nread == 0 */ + io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE; break; - case SOUP_MESSAGE_IO_STATE_CHUNK: - if (!read_body_chunk (msg)) - return; + case SOUP_MESSAGE_IO_STATE_BODY_DONE: + if (!io_handle_sniffing (msg, TRUE)) + return FALSE; - io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_END; + io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING; + soup_message_got_body (msg); break; - case SOUP_MESSAGE_IO_STATE_CHUNK_END: - if (!read_metadata (msg, FALSE)) - return; + case SOUP_MESSAGE_IO_STATE_FINISHING: + io->read_state = SOUP_MESSAGE_IO_STATE_DONE; - g_byte_array_set_size (io->read_meta_buf, 0); - io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE; + if (io->mode == SOUP_MESSAGE_IO_SERVER) + io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS; break; - case SOUP_MESSAGE_IO_STATE_TRAILERS: - if (!read_metadata (msg, FALSE)) - return; + case SOUP_MESSAGE_IO_STATE_DONE: + default: + g_return_val_if_reached (FALSE); + } - if (io->read_meta_buf->len <= SOUP_MESSAGE_IO_EOL_LEN) - goto got_body; + return TRUE; +} - /* FIXME: process trailers */ - g_byte_array_set_size (io->read_meta_buf, 0); - break; +static GSource * +soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable, + GSourceFunc callback, gpointer user_data) +{ + SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); + SoupMessageIOData *io = priv->io_data; + GSource *source; + + if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) { + source = g_pollable_input_stream_create_source ( + G_POLLABLE_INPUT_STREAM (io->istream), cancellable); + } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) { + source = g_pollable_output_stream_create_source ( + G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable); + } else + g_return_val_if_reached (NULL); + g_source_set_callback (source, callback, user_data, NULL); + return source; +} - case SOUP_MESSAGE_IO_STATE_FINISHING: - if (io->read_tag) { - g_signal_handler_disconnect (io->sock, io->read_tag); - io->read_tag = 0; - } - io->read_state = SOUP_MESSAGE_IO_STATE_DONE; +static gboolean io_run (GObject *stream, SoupMessage *msg); - if (io->mode == SOUP_MESSAGE_IO_SERVER) { - io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS; - io_write (sock, msg); - } else - soup_message_io_finished (msg); - return; +static void +setup_io_source (SoupMessage *msg) +{ + SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); + SoupMessageIOData *io = priv->io_data; + io->io_source = soup_message_io_get_source (msg, NULL, + (GSourceFunc)io_run, msg); + g_source_attach (io->io_source, io->async_context); + g_source_unref (io->io_source); +} - case SOUP_MESSAGE_IO_STATE_DONE: - default: - g_return_if_reached (); +static gboolean +io_run (GObject *stream, SoupMessage *msg) +{ + SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); + SoupMessageIOData *io = priv->io_data; + GError *error = NULL; + + if (io->io_source) { + g_source_destroy (io->io_source); + io->io_source = NULL; + } + + g_object_ref (msg); + + while (priv->io_data == io && !io->paused) { + gboolean progress = FALSE; + + if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) + progress = io_read (msg, io->cancellable, &error); + else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) + progress = io_write (msg, io->cancellable, &error); + + if (!progress) + break; } - goto read_more; + if (error) { + if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { + g_clear_error (&error); + setup_io_source (msg); + } else + io_error (io->sock, msg, error); + } else if (priv->io_data == io && + io->read_state == SOUP_MESSAGE_IO_STATE_DONE && + io->write_state == SOUP_MESSAGE_IO_STATE_DONE) + soup_message_io_finished (msg); + + g_object_unref (msg); + return FALSE; } + static SoupMessageIOData * new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode, SoupMessageGetHeadersFn get_headers_cb, @@ -1089,9 +915,9 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode, { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io; + gboolean non_blocking, use_thread_context; io = g_slice_new0 (SoupMessageIOData); - io->sock = g_object_ref (sock); io->mode = mode; io->get_headers_cb = get_headers_cb; io->parse_headers_cb = parse_headers_cb; @@ -1099,13 +925,32 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode, io->completion_cb = completion_cb; io->completion_data = completion_data; - io->read_meta_buf = g_byte_array_new (); - io->write_buf = g_string_new (NULL); + io->sock = g_object_ref (sock); + io->istream = SOUP_FILTER_INPUT_STREAM (soup_socket_get_input_stream (sock)); + if (io->istream) + g_object_add_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream); + io->ostream = soup_socket_get_output_stream (sock); + if (io->ostream) + g_object_add_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream); + + g_object_get (io->sock, + SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking, + SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context, + NULL); + io->blocking = !non_blocking; + + if (use_thread_context) { + io->async_context = g_main_context_get_thread_default (); + if (io->async_context) + g_main_context_ref (io->async_context); + } else { + g_object_get (io->sock, + SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context, + NULL); + } - io->read_tag = g_signal_connect (io->sock, "readable", - G_CALLBACK (io_read), msg); - io->write_tag = g_signal_connect (io->sock, "writable", - G_CALLBACK (io_write), msg); + io->read_header_buf = g_byte_array_new (); + io->write_buf = g_string_new (NULL); io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED; io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED; @@ -1139,7 +984,7 @@ soup_message_io_client (SoupMessageQueueItem *item, io->write_body = item->msg->request_body; io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS; - io_write (sock, item->msg); + io_run (NULL, item->msg); } void @@ -1160,7 +1005,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock, io->write_body = msg->response_body; io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS; - io_read (sock, msg); + io_run (NULL, msg); } void @@ -1171,19 +1016,17 @@ soup_message_io_pause (SoupMessage *msg) g_return_if_fail (io != NULL); - if (io->write_tag) { - g_signal_handler_disconnect (io->sock, io->write_tag); - io->write_tag = 0; - } - if (io->read_tag) { - g_signal_handler_disconnect (io->sock, io->read_tag); - io->read_tag = 0; + if (io->io_source) { + g_source_destroy (io->io_source); + io->io_source = NULL; } if (io->unpause_source) { g_source_destroy (io->unpause_source); io->unpause_source = NULL; } + + io->paused = TRUE; } static gboolean @@ -1194,25 +1037,12 @@ io_unpause_internal (gpointer msg) g_return_val_if_fail (io != NULL, FALSE); io->unpause_source = NULL; + io->paused = FALSE; - if (io->write_tag || io->read_tag) + if (io->io_source) return FALSE; - if (io->write_state != SOUP_MESSAGE_IO_STATE_DONE) { - io->write_tag = g_signal_connect (io->sock, "writable", - G_CALLBACK (io_write), msg); - } - - if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE) { - io->read_tag = g_signal_connect (io->sock, "readable", - G_CALLBACK (io_read), msg); - } - - if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) - io_write (io->sock, msg); - else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) - io_read (io->sock, msg); - + io_run (NULL, msg); return FALSE; } @@ -1221,32 +1051,16 @@ soup_message_io_unpause (SoupMessage *msg) { SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg); SoupMessageIOData *io = priv->io_data; - gboolean non_blocking, use_thread_context; - GMainContext *async_context; g_return_if_fail (io != NULL); - g_object_get (io->sock, - SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking, - SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context, - NULL); - if (use_thread_context) - async_context = g_main_context_ref_thread_default (); - else { - g_object_get (io->sock, - SOUP_SOCKET_ASYNC_CONTEXT, &async_context, - NULL); - } - - if (non_blocking) { + if (!io->blocking) { if (!io->unpause_source) { io->unpause_source = soup_add_completion ( - async_context, io_unpause_internal, msg); + io->async_context, io_unpause_internal, msg); } } else io_unpause_internal (msg); - if (async_context) - g_main_context_unref (async_context); } /** diff --git a/libsoup/soup-socket.c b/libsoup/soup-socket.c index 049be92..2d72b38 100644 --- a/libsoup/soup-socket.c +++ b/libsoup/soup-socket.c @@ -129,8 +129,6 @@ disconnect_internal (SoupSocket *sock, gboolean close) if (G_IS_TLS_CONNECTION (priv->conn)) g_signal_handlers_disconnect_by_func (priv->conn, soup_socket_peer_certificate_changed, sock); g_clear_object (&priv->conn); - g_clear_object (&priv->istream); - g_clear_object (&priv->ostream); } if (priv->read_src) { @@ -1325,6 +1323,22 @@ soup_socket_get_remote_address (SoupSocket *sock) return priv->remote_addr; } +GInputStream * +soup_socket_get_input_stream (SoupSocket *sock) +{ + g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL); + + return SOUP_SOCKET_GET_PRIVATE (sock)->istream; +} + +GOutputStream * +soup_socket_get_output_stream (SoupSocket *sock) +{ + g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL); + + return SOUP_SOCKET_GET_PRIVATE (sock)->ostream; +} + static gboolean socket_read_watch (GObject *pollable, gpointer user_data) diff --git a/libsoup/soup-socket.h b/libsoup/soup-socket.h index dc6b59c..5cbf14a 100644 --- a/libsoup/soup-socket.h +++ b/libsoup/soup-socket.h @@ -85,6 +85,8 @@ gboolean soup_socket_is_connected (SoupSocket *sock); SoupAddress *soup_socket_get_local_address (SoupSocket *sock); SoupAddress *soup_socket_get_remote_address (SoupSocket *sock); +GInputStream *soup_socket_get_input_stream (SoupSocket *sock); +GOutputStream *soup_socket_get_output_stream (SoupSocket *sock); typedef enum { SOUP_SOCKET_OK, diff --git a/po/POTFILES.in b/po/POTFILES.in index c43b943..4115bb0 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -1,2 +1,3 @@ +libsoup/soup-body-input-stream.c libsoup/soup-request.c libsoup/soup-requester.c diff --git a/tests/chunk-test.c b/tests/chunk-test.c index 3805fb7..c3eecc4 100644 --- a/tests/chunk-test.c +++ b/tests/chunk-test.c @@ -21,15 +21,6 @@ typedef struct { gboolean streaming; } PutTestData; -static SoupBuffer * -error_chunk_allocator (SoupMessage *msg, gsize max_len, gpointer user_data) -{ - /* This should never be called, because there is no response body. */ - debug_printf (1, " error_chunk_allocator called!\n"); - errors++; - return soup_buffer_new (SOUP_MEMORY_TAKE, g_malloc (100), 100); -} - static void write_next_chunk (SoupMessage *msg, gpointer user_data) { @@ -191,7 +182,6 @@ do_request_test (SoupSession *session, SoupURI *base_uri, RequestTestFlags flags msg = soup_message_new_from_uri ("PUT", uri); soup_message_headers_set_encoding (msg->request_headers, SOUP_ENCODING_CHUNKED); soup_message_body_set_accumulate (msg->request_body, FALSE); - soup_message_set_chunk_allocator (msg, error_chunk_allocator, NULL, NULL); if (flags & HACKY_STREAMING) { g_signal_connect (msg, "wrote_chunk", G_CALLBACK (write_next_chunk_streaming_hack), &ptd); diff --git a/tests/connection-test.c b/tests/connection-test.c index 545bf10..7c6fb5a 100644 --- a/tests/connection-test.c +++ b/tests/connection-test.c @@ -23,11 +23,20 @@ static void close_socket (SoupMessage *msg, gpointer user_data) { SoupSocket *sock = user_data; + int sockfd; - soup_socket_disconnect (sock); - - /* But also add the missing data to the message now, so - * SoupServer can clean up after itself properly. + /* Actually calling soup_socket_disconnect() here would cause + * us to leak memory, so just shutdown the socket instead. + */ + sockfd = soup_socket_get_fd (sock); +#ifdef G_OS_WIN32 + shutdown (sockfd, SD_SEND); +#else + shutdown (sockfd, SHUT_WR); +#endif + + /* Then add the missing data to the message now, so SoupServer + * can clean up after itself properly. */ soup_message_body_append (msg->response_body, SOUP_MEMORY_STATIC, "foo", 3); -- 2.7.4