From 6b9cbd9736486821d189aeaed1e8d327aed2b2a7 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Sun, 15 Jan 2012 10:17:21 -0500 Subject: [PATCH] Add SoupFilterInputStream SoupFilterInputStream is basically a subset of GDataInputStream, plus non-blocking read_line()/read_until(). Wrap the existing socket istream member with a SoupFilterInputStream, and use its buffering rather than doing the buffering in SoupSocket. --- configure.ac | 5 +- libsoup/Makefile.am | 2 + libsoup/soup-filter-input-stream.c | 257 +++++++++++++++++++++++++++++++++++++ libsoup/soup-filter-input-stream.h | 56 ++++++++ libsoup/soup-socket.c | 163 +++++++++-------------- 5 files changed, 379 insertions(+), 104 deletions(-) create mode 100644 libsoup/soup-filter-input-stream.c create mode 100644 libsoup/soup-filter-input-stream.h diff --git a/configure.ac b/configure.ac index 84d3ec6..f52e6de 100644 --- a/configure.ac +++ b/configure.ac @@ -72,12 +72,13 @@ dnl *********************** dnl *** Checks for glib *** dnl *********************** -GLIB_REQUIRED=2.31.7 +dnl FIXME: should actually be 2.33.0, but glib hasn't bumped yet +GLIB_REQUIRED=2.32.2 AM_PATH_GLIB_2_0($GLIB_REQUIRED,,,gobject gio) if test "$GLIB_LIBS" = ""; then AC_MSG_ERROR(GLIB $GLIB_REQUIRED or later is required to build libsoup) fi -GLIB_CFLAGS="$GLIB_CFLAGS -DG_DISABLE_SINGLE_INCLUDES" +GLIB_CFLAGS="$GLIB_CFLAGS -DGLIB_VERSION_MIN_REQUIRED=GLIB_VERSION_2_34" GLIB_MAKEFILE='$(top_srcdir)/Makefile.glib' AC_SUBST(GLIB_MAKEFILE) diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am index 2b36a86..4526ca5 100644 --- a/libsoup/Makefile.am +++ b/libsoup/Makefile.am @@ -109,6 +109,8 @@ libsoup_2_4_la_SOURCES = \ soup-directory-input-stream.c \ soup-enum-types.h \ soup-enum-types.c \ + soup-filter-input-stream.c \ + soup-filter-input-stream.h \ soup-form.c \ soup-headers.c \ soup-http-input-stream.h \ diff --git a/libsoup/soup-filter-input-stream.c b/libsoup/soup-filter-input-stream.c new file mode 100644 index 0000000..52ec6cd --- /dev/null +++ b/libsoup/soup-filter-input-stream.c @@ -0,0 +1,257 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * soup-filter-input-stream.c + * + * Copyright 2012 Red Hat, Inc. + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include + +#include "soup-filter-input-stream.h" + +/* This is essentially a subset of GDataInputStream, except that we + * can do the equivalent of "fill_nonblocking()" on it. (We could use + * an actual GDataInputStream, and implement the nonblocking semantics + * via fill_async(), but that would be more work...) + */ + +struct _SoupFilterInputStreamPrivate { + GByteArray *buf; +}; + +static void soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data); + +G_DEFINE_TYPE_WITH_CODE (SoupFilterInputStream, soup_filter_input_stream, G_TYPE_FILTER_INPUT_STREAM, + G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, + soup_filter_input_stream_pollable_init)) + +static void +soup_filter_input_stream_init (SoupFilterInputStream *stream) +{ + stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, + SOUP_TYPE_FILTER_INPUT_STREAM, + SoupFilterInputStreamPrivate); +} + +static void +finalize (GObject *object) +{ + SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (object); + + if (fstream->priv->buf) + g_byte_array_free (fstream->priv->buf, TRUE); + + G_OBJECT_CLASS (soup_filter_input_stream_parent_class)->finalize (object); +} + +static gssize +read_from_buf (SoupFilterInputStream *fstream, gpointer buffer, gsize count) +{ + GByteArray *buf = fstream->priv->buf; + + if (buf->len < count) + count = buf->len; + memcpy (buffer, buf->data, count); + + if (count == buf->len) { + g_byte_array_free (buf, TRUE); + fstream->priv->buf = NULL; + } else { + memmove (buf->data, buf->data + count, + buf->len - count); + g_byte_array_set_size (buf, buf->len - count); + } + + return count; +} + +static gssize +soup_filter_input_stream_read_fn (GInputStream *stream, + void *buffer, + gsize count, + GCancellable *cancellable, + GError **error) +{ + SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream); + + if (fstream->priv->buf) { + return read_from_buf (fstream, buffer, count); + } else { + return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream, + buffer, count, + TRUE, cancellable, error); + } +} + +static gboolean +soup_filter_input_stream_is_readable (GPollableInputStream *stream) +{ + SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream); + + if (fstream->priv->buf) + return TRUE; + else + return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream)); +} + +static gssize +soup_filter_input_stream_read_nonblocking (GPollableInputStream *stream, + void *buffer, + gsize count, + GError **error) +{ + SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream); + + if (fstream->priv->buf) { + return read_from_buf (fstream, buffer, count); + } else { + return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream, + buffer, count, + FALSE, NULL, error); + } +} + +static GSource * +soup_filter_input_stream_create_source (GPollableInputStream *stream, + GCancellable *cancellable) +{ + SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream); + GSource *base_source, *pollable_source; + + if (fstream->priv->buf) + base_source = g_timeout_source_new (0); + else + base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream), cancellable); + + g_source_set_dummy_callback (base_source); + pollable_source = g_pollable_source_new (G_OBJECT (stream)); + g_source_add_child_source (pollable_source, base_source); + g_source_unref (base_source); + + return pollable_source; +} + +static void +soup_filter_input_stream_class_init (SoupFilterInputStreamClass *stream_class) +{ + GObjectClass *object_class = G_OBJECT_CLASS (stream_class); + GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class); + + g_type_class_add_private (stream_class, sizeof (SoupFilterInputStreamPrivate)); + + object_class->finalize = finalize; + + input_stream_class->read_fn = soup_filter_input_stream_read_fn; +} + +static void +soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, + gpointer interface_data) +{ + pollable_interface->is_readable = soup_filter_input_stream_is_readable; + pollable_interface->read_nonblocking = soup_filter_input_stream_read_nonblocking; + pollable_interface->create_source = soup_filter_input_stream_create_source; +} + +GInputStream * +soup_filter_input_stream_new (GInputStream *base_stream) +{ + return g_object_new (SOUP_TYPE_FILTER_INPUT_STREAM, + "base-stream", base_stream, + "close-base-stream", FALSE, + NULL); +} + +gssize +soup_filter_input_stream_read_line (SoupFilterInputStream *fstream, + void *buffer, + gsize length, + gboolean blocking, + gboolean *got_line, + GCancellable *cancellable, + GError **error) +{ + return soup_filter_input_stream_read_until (fstream, buffer, length, + "\n", 1, blocking, + got_line, + cancellable, error); +} + +gssize +soup_filter_input_stream_read_until (SoupFilterInputStream *fstream, + void *buffer, + gsize length, + const void *boundary, + gsize boundary_length, + gboolean blocking, + gboolean *got_boundary, + GCancellable *cancellable, + GError **error) +{ + gssize nread; + guint8 *p, *buf, *end; + gboolean eof = FALSE; + + g_return_val_if_fail (SOUP_IS_FILTER_INPUT_STREAM (fstream), -1); + g_return_val_if_fail (boundary_length < length, -1); + + *got_boundary = FALSE; + + if (!fstream->priv->buf || fstream->priv->buf->len < boundary_length) { + guint prev_len; + + fill_buffer: + if (!fstream->priv->buf) + fstream->priv->buf = g_byte_array_new (); + prev_len = fstream->priv->buf->len; + g_byte_array_set_size (fstream->priv->buf, length); + buf = fstream->priv->buf->data; + + nread = g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream, + buf + prev_len, length - prev_len, + blocking, + cancellable, error); + if (nread <= 0) { + if (prev_len) + fstream->priv->buf->len = prev_len; + else { + g_byte_array_free (fstream->priv->buf, TRUE); + fstream->priv->buf = NULL; + } + + if (nread == 0 && prev_len) + eof = TRUE; + else + return nread; + } else + fstream->priv->buf->len = prev_len + nread; + } else + buf = fstream->priv->buf->data; + + /* Scan for the boundary */ + end = buf + fstream->priv->buf->len; + if (!eof) + end -= boundary_length; + for (p = buf; p <= end; p++) { + if (!memcmp (p, boundary, boundary_length)) { + p += boundary_length; + *got_boundary = TRUE; + break; + } + } + + if (!*got_boundary && fstream->priv->buf->len < length && !eof) + goto fill_buffer; + + /* Return everything up to 'p' (which is either just after the + * boundary, @boundary_len - 1 bytes before the end of the + * buffer, or end-of-file). + */ + return read_from_buf (fstream, buffer, p - buf); +} diff --git a/libsoup/soup-filter-input-stream.h b/libsoup/soup-filter-input-stream.h new file mode 100644 index 0000000..b86a476 --- /dev/null +++ b/libsoup/soup-filter-input-stream.h @@ -0,0 +1,56 @@ +/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */ +/* + * Copyright 2012 Red Hat, Inc. + */ + +#ifndef SOUP_FILTER_INPUT_STREAM_H +#define SOUP_FILTER_INPUT_STREAM_H 1 + +#include + +G_BEGIN_DECLS + +#define SOUP_TYPE_FILTER_INPUT_STREAM (soup_filter_input_stream_get_type ()) +#define SOUP_FILTER_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_FILTER_INPUT_STREAM, SoupFilterInputStream)) +#define SOUP_FILTER_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_FILTER_INPUT_STREAM, SoupFilterInputStreamClass)) +#define SOUP_IS_FILTER_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_FILTER_INPUT_STREAM)) +#define SOUP_IS_FILTER_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_FILTER_INPUT_STREAM)) +#define SOUP_FILTER_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_FILTER_INPUT_STREAM, SoupFilterInputStreamClass)) + +typedef struct _SoupFilterInputStreamPrivate SoupFilterInputStreamPrivate; + +typedef struct { + GFilterInputStream parent; + + SoupFilterInputStreamPrivate *priv; +} SoupFilterInputStream; + +typedef struct { + GFilterInputStreamClass parent_class; + +} SoupFilterInputStreamClass; + +GType soup_filter_input_stream_get_type (void); + +GInputStream *soup_filter_input_stream_new (GInputStream *base_stream); + +gssize soup_filter_input_stream_read_line (SoupFilterInputStream *fstream, + void *buffer, + gsize length, + gboolean blocking, + gboolean *got_line, + GCancellable *cancellable, + GError **error); +gssize soup_filter_input_stream_read_until (SoupFilterInputStream *fstream, + void *buffer, + gsize length, + const void *boundary, + gsize boundary_len, + gboolean blocking, + gboolean *got_boundary, + GCancellable *cancellable, + GError **error); + +G_END_DECLS + +#endif /* SOUP_FILTER_INPUT_STREAM_H */ diff --git a/libsoup/soup-socket.c b/libsoup/soup-socket.c index 26c90c4..049be92 100644 --- a/libsoup/soup-socket.c +++ b/libsoup/soup-socket.c @@ -16,8 +16,9 @@ #include #include -#include "soup-address.h" #include "soup-socket.h" +#include "soup-address.h" +#include "soup-filter-input-stream.h" #include "soup-marshal.h" #include "soup-misc.h" #include "soup-misc-private.h" @@ -70,8 +71,8 @@ typedef struct { SoupAddress *local_addr, *remote_addr; GIOStream *conn; GSocket *gsock; - GPollableInputStream *istream; - GPollableOutputStream *ostream; + GInputStream *istream; + GOutputStream *ostream; GTlsCertificateFlags tls_errors; guint non_blocking:1; @@ -86,7 +87,6 @@ typedef struct { GMainContext *async_context; GSource *watch_src; GSource *read_src, *write_src; - GByteArray *read_buf; GMutex iolock, addrlock; guint timeout; @@ -128,10 +128,9 @@ disconnect_internal (SoupSocket *sock, gboolean close) if (priv->conn) { if (G_IS_TLS_CONNECTION (priv->conn)) g_signal_handlers_disconnect_by_func (priv->conn, soup_socket_peer_certificate_changed, sock); - g_object_unref (priv->conn); - priv->conn = NULL; - priv->istream = NULL; - priv->ostream = NULL; + g_clear_object (&priv->conn); + g_clear_object (&priv->istream); + g_clear_object (&priv->ostream); } if (priv->read_src) { @@ -160,6 +159,9 @@ finalize (GObject *object) disconnect_internal (SOUP_SOCKET (object), TRUE); } + g_clear_object (&priv->istream); + g_clear_object (&priv->ostream); + if (priv->local_addr) g_object_unref (priv->local_addr); if (priv->remote_addr) @@ -173,9 +175,6 @@ finalize (GObject *object) if (priv->async_context) g_main_context_unref (priv->async_context); - if (priv->read_buf) - g_byte_array_free (priv->read_buf, TRUE); - g_mutex_clear (&priv->addrlock); g_mutex_clear (&priv->iolock); @@ -521,9 +520,9 @@ finish_socket_setup (SoupSocketPrivate *priv) if (!priv->conn) priv->conn = (GIOStream *)g_socket_connection_factory_create_connection (priv->gsock); if (!priv->istream) - priv->istream = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (priv->conn)); + priv->istream = soup_filter_input_stream_new (g_io_stream_get_input_stream (priv->conn)); if (!priv->ostream) - priv->ostream = G_POLLABLE_OUTPUT_STREAM (g_io_stream_get_output_stream (priv->conn)); + priv->ostream = g_object_ref (g_io_stream_get_output_stream (priv->conn)); g_socket_set_timeout (priv->gsock, priv->timeout); } @@ -862,9 +861,9 @@ soup_socket_create_watch (SoupSocketPrivate *priv, GIOCondition cond, GMainContext *async_context; if (cond == G_IO_IN) - watch = g_pollable_input_stream_create_source (priv->istream, cancellable); + watch = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (priv->istream), cancellable); else - watch = g_pollable_output_stream_create_source (priv->ostream, cancellable); + watch = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (priv->ostream), cancellable); g_source_set_callback (watch, (GSourceFunc)callback, user_data, NULL); if (priv->use_thread_context) @@ -1089,8 +1088,13 @@ soup_socket_start_proxy_ssl (SoupSocket *sock, const char *ssl_host, g_signal_connect (priv->conn, "notify::peer-certificate", G_CALLBACK (soup_socket_peer_certificate_changed), sock); - priv->istream = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (priv->conn)); - priv->ostream = G_POLLABLE_OUTPUT_STREAM (g_io_stream_get_output_stream (priv->conn)); + if (priv->istream) + g_object_unref (priv->istream); + if (priv->ostream) + g_object_unref (priv->ostream); + + priv->istream = soup_filter_input_stream_new (g_io_stream_get_input_stream (priv->conn)); + priv->ostream = g_object_ref (g_io_stream_get_output_stream (priv->conn)); return TRUE; } @@ -1334,34 +1338,18 @@ socket_read_watch (GObject *pollable, gpointer user_data) } static SoupSocketIOStatus -read_from_network (SoupSocket *sock, gpointer buffer, gsize len, - gsize *nread, GCancellable *cancellable, GError **error) +translate_read_status (SoupSocket *sock, GCancellable *cancellable, + gssize my_nread, gsize *nread, + GError *my_err, GError **error) { SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock); - GError *my_err = NULL; - gssize my_nread; - - *nread = 0; - - if (!priv->conn) - return SOUP_SOCKET_EOF; - - if (!priv->non_blocking) { - my_nread = g_input_stream_read (G_INPUT_STREAM (priv->istream), - buffer, len, - cancellable, &my_err); - } else { - my_nread = g_pollable_input_stream_read_nonblocking ( - priv->istream, buffer, len, - cancellable, &my_err); - } if (my_nread > 0) { - g_clear_error (&my_err); + g_assert_no_error (my_err); *nread = my_nread; return SOUP_SOCKET_OK; } else if (my_nread == 0) { - g_clear_error (&my_err); + g_assert_no_error (my_err); *nread = my_nread; return SOUP_SOCKET_EOF; } else if (g_error_matches (my_err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { @@ -1379,27 +1367,6 @@ read_from_network (SoupSocket *sock, gpointer buffer, gsize len, return SOUP_SOCKET_ERROR; } -static SoupSocketIOStatus -read_from_buf (SoupSocket *sock, gpointer buffer, gsize len, gsize *nread) -{ - SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock); - GByteArray *read_buf = priv->read_buf; - - *nread = MIN (read_buf->len, len); - memcpy (buffer, read_buf->data, *nread); - - if (*nread == read_buf->len) { - g_byte_array_free (read_buf, TRUE); - priv->read_buf = NULL; - } else { - memmove (read_buf->data, read_buf->data + *nread, - read_buf->len - *nread); - g_byte_array_set_size (read_buf, read_buf->len - *nread); - } - - return SOUP_SOCKET_OK; -} - /** * SoupSocketIOStatus: * @SOUP_SOCKET_OK: Success @@ -1443,6 +1410,8 @@ soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len, { SoupSocketPrivate *priv; SoupSocketIOStatus status; + gssize my_nread; + GError *my_err = NULL; g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR); g_return_val_if_fail (nread != NULL, SOUP_SOCKET_ERROR); @@ -1450,10 +1419,24 @@ soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len, priv = SOUP_SOCKET_GET_PRIVATE (sock); g_mutex_lock (&priv->iolock); - if (priv->read_buf) - status = read_from_buf (sock, buffer, len, nread); - else - status = read_from_network (sock, buffer, len, nread, cancellable, error); + + if (!priv->istream) { + status = SOUP_SOCKET_EOF; + goto out; + } + + if (!priv->non_blocking) { + my_nread = g_input_stream_read (priv->istream, buffer, len, + cancellable, &my_err); + } else { + my_nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (priv->istream), + buffer, len, + cancellable, &my_err); + } + status = translate_read_status (sock, cancellable, + my_nread, nread, my_err, error); + +out: g_mutex_unlock (&priv->iolock); return status; @@ -1495,9 +1478,8 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len, { SoupSocketPrivate *priv; SoupSocketIOStatus status; - GByteArray *read_buf; - guint match_len, prev_len; - guint8 *p, *end; + gssize my_nread; + GError *my_err = NULL; g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR); g_return_val_if_fail (nread != NULL, SOUP_SOCKET_ERROR); @@ -1509,41 +1491,18 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len, *got_boundary = FALSE; - if (!priv->read_buf) - priv->read_buf = g_byte_array_new (); - read_buf = priv->read_buf; - - if (read_buf->len < boundary_len) { - prev_len = read_buf->len; - g_byte_array_set_size (read_buf, len); - status = read_from_network (sock, - read_buf->data + prev_len, - len - prev_len, nread, cancellable, error); - read_buf->len = prev_len + *nread; - - if (status != SOUP_SOCKET_OK) { - g_mutex_unlock (&priv->iolock); - return status; - } - } - - /* Scan for the boundary */ - end = read_buf->data + read_buf->len; - for (p = read_buf->data; p <= end - boundary_len; p++) { - if (!memcmp (p, boundary, boundary_len)) { - p += boundary_len; - *got_boundary = TRUE; - break; - } + if (!priv->istream) + status = SOUP_SOCKET_EOF; + else { + my_nread = soup_filter_input_stream_read_until ( + SOUP_FILTER_INPUT_STREAM (priv->istream), + buffer, len, boundary, boundary_len, + !priv->non_blocking, + got_boundary, cancellable, &my_err); + status = translate_read_status (sock, cancellable, + my_nread, nread, my_err, error); } - /* Return everything up to 'p' (which is either just after the - * boundary, or @boundary_len - 1 bytes before the end of the - * buffer). - */ - match_len = p - read_buf->data; - status = read_from_buf (sock, buffer, MIN (len, match_len), nread); - g_mutex_unlock (&priv->iolock); return status; } @@ -1611,13 +1570,13 @@ soup_socket_write (SoupSocket *sock, gconstpointer buffer, } if (!priv->non_blocking) { - my_nwrote = g_output_stream_write (G_OUTPUT_STREAM (priv->ostream), + my_nwrote = g_output_stream_write (priv->ostream, buffer, len, cancellable, &my_err); } else { my_nwrote = g_pollable_output_stream_write_nonblocking ( - priv->ostream, buffer, len, - cancellable, &my_err); + G_POLLABLE_OUTPUT_STREAM (priv->ostream), + buffer, len, cancellable, &my_err); } if (my_nwrote > 0) { -- 2.7.4