soup-message-io: move content sniffing out into a wrapper stream
authorDan Winship <danw@gnome.org>
Thu, 8 Sep 2011 23:24:21 +0000 (19:24 -0400)
committerDan Winship <danw@gnome.org>
Wed, 18 Apr 2012 01:26:23 +0000 (21:26 -0400)
Add a wrapper input stream that handles content sniffing, and use that
from soup-message-io.

libsoup/Makefile.am
libsoup/soup-content-sniffer-stream.c [new file with mode: 0644]
libsoup/soup-content-sniffer-stream.h [new file with mode: 0644]
libsoup/soup-content-sniffer.c
libsoup/soup-content-sniffer.h
libsoup/soup-message-io.c

index 3be0f9b..ad85b07 100644 (file)
@@ -105,6 +105,8 @@ libsoup_2_4_la_SOURCES =            \
        soup-connection.c               \
        soup-content-decoder.c          \
        soup-content-sniffer.c          \
+       soup-content-sniffer-stream.h   \
+       soup-content-sniffer-stream.c   \
        soup-converter-wrapper.h        \
        soup-converter-wrapper.c        \
        soup-cookie.c                   \
diff --git a/libsoup/soup-content-sniffer-stream.c b/libsoup/soup-content-sniffer-stream.c
new file mode 100644 (file)
index 0000000..a391c85
--- /dev/null
@@ -0,0 +1,368 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-content-sniffer-stream.c
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <string.h>
+#include <gio/gio.h>
+
+#include "soup-content-sniffer-stream.h"
+#include "soup-content-sniffer.h"
+#include "soup-message.h"
+
+static void soup_content_sniffer_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupContentSnifferStream, soup_content_sniffer_stream, G_TYPE_FILTER_INPUT_STREAM,
+                        G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                                               soup_content_sniffer_stream_pollable_init))
+
+enum {
+       PROP_0,
+
+       PROP_SNIFFER,
+       PROP_MESSAGE,
+};
+
+struct _SoupContentSnifferStreamPrivate {
+       SoupContentSniffer *sniffer;
+       SoupMessage *msg;
+
+       guchar *buffer;
+       gsize buffer_size, buffer_nread;
+       gboolean sniffing;
+       GError *error;
+
+       char *sniffed_type;
+       GHashTable *sniffed_params;
+};
+
+static void
+soup_content_sniffer_stream_finalize (GObject *object)
+{
+       SoupContentSnifferStream *sniffer = SOUP_CONTENT_SNIFFER_STREAM (object);
+
+       if (sniffer->priv->sniffer)
+               g_object_unref (sniffer->priv->sniffer);
+       if (sniffer->priv->msg)
+               g_object_unref (sniffer->priv->msg);
+       if (sniffer->priv->buffer)
+               g_free (sniffer->priv->buffer);
+       if (sniffer->priv->error)
+               g_error_free (sniffer->priv->error);
+       if (sniffer->priv->sniffed_type)
+               g_free (sniffer->priv->sniffed_type);
+       if (sniffer->priv->sniffed_params)
+               g_hash_table_unref (sniffer->priv->sniffed_params);
+
+       G_OBJECT_CLASS (soup_content_sniffer_stream_parent_class)->finalize (object);
+}
+
+static void
+soup_content_sniffer_stream_set_property (GObject *object, guint prop_id,
+                                         const GValue *value, GParamSpec *pspec)
+{
+       SoupContentSnifferStream *sniffer = SOUP_CONTENT_SNIFFER_STREAM (object);
+
+       switch (prop_id) {
+       case PROP_SNIFFER:
+               sniffer->priv->sniffer = g_value_dup_object (value);
+               /* FIXME: supposed to wait until after got-headers for this */
+               sniffer->priv->buffer_size = soup_content_sniffer_get_buffer_size (sniffer->priv->sniffer);
+               sniffer->priv->buffer = g_malloc (sniffer->priv->buffer_size);
+               break;
+       case PROP_MESSAGE:
+               sniffer->priv->msg = g_value_dup_object (value);
+               break;
+       default:
+               G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+               break;
+       }
+}
+
+static void
+soup_content_sniffer_stream_get_property (GObject *object, guint prop_id,
+                                         GValue *value, GParamSpec *pspec)
+{
+       SoupContentSnifferStream *sniffer = SOUP_CONTENT_SNIFFER_STREAM (object);
+
+       switch (prop_id) {
+       case PROP_SNIFFER:
+               g_value_set_object (value, sniffer->priv->sniffer);
+               break;
+       case PROP_MESSAGE:
+               g_value_set_object (value, sniffer->priv->msg);
+               break;
+       default:
+               G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+               break;
+       }
+}
+
+static gssize
+read_and_sniff (GInputStream *stream, gboolean blocking,
+               GCancellable *cancellable, GError **error)
+{
+       SoupContentSnifferStreamPrivate *priv = SOUP_CONTENT_SNIFFER_STREAM (stream)->priv;
+       gssize nread;
+       GError *my_error = NULL;
+       SoupBuffer *buf;
+
+       do {
+               nread = g_pollable_stream_read (G_FILTER_INPUT_STREAM (stream)->base_stream,
+                                               priv->buffer + priv->buffer_nread,
+                                               priv->buffer_size - priv->buffer_nread,
+                                               blocking, cancellable, &my_error);
+               if (nread <= 0)
+                       break;
+               priv->buffer_nread += nread;
+       } while (priv->buffer_nread < priv->buffer_size);
+
+       /* If we got EAGAIN or cancellation before filling the buffer,
+        * just return that right away. Likewise if we got any other
+        * error without ever reading any data. Otherwise, save the
+        * error to return after we're done sniffing.
+        */
+       if (my_error) {
+               if (g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
+                   g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_CANCELLED) ||
+                   priv->buffer_nread == 0) {
+                       g_propagate_error (error, my_error);
+                       return -1;
+               } else
+                       priv->error = my_error;
+       }
+
+       /* Sniff, then return the data */
+       buf = soup_buffer_new (SOUP_MEMORY_TEMPORARY, priv->buffer, priv->buffer_nread);
+       priv->sniffed_type =
+               soup_content_sniffer_sniff (priv->sniffer, priv->msg, buf,
+                                           &priv->sniffed_params);
+       soup_buffer_free (buf);
+       priv->sniffing = FALSE;
+
+       return priv->buffer_nread;
+}      
+
+static gssize
+read_internal (GInputStream  *stream,
+              void          *buffer,
+              gsize          count,
+              gboolean       blocking,
+              GCancellable  *cancellable,
+              GError       **error)
+{
+       SoupContentSnifferStream *sniffer = SOUP_CONTENT_SNIFFER_STREAM (stream);
+       gssize nread;
+
+       if (sniffer->priv->error) {
+               g_propagate_error (error, sniffer->priv->error);
+               sniffer->priv->error = NULL;
+               return -1;
+       }
+
+       if (sniffer->priv->sniffing) {
+               nread = read_and_sniff (stream, blocking, cancellable, error);
+               if (nread <= 0)
+                       return nread;
+       }
+
+       if (sniffer->priv->buffer) {
+               nread = MIN (count, sniffer->priv->buffer_nread);
+               memcpy (buffer, sniffer->priv->buffer, nread);
+               if (nread == sniffer->priv->buffer_nread) {
+                       g_free (sniffer->priv->buffer);
+                       sniffer->priv->buffer = NULL;
+               } else {
+                       /* FIXME, inefficient */
+                       memmove (sniffer->priv->buffer,
+                                sniffer->priv->buffer + nread,
+                                sniffer->priv->buffer_nread - nread);
+                       sniffer->priv->buffer_nread -= nread;
+               }
+       } else {
+               nread = g_pollable_stream_read (G_FILTER_INPUT_STREAM (stream)->base_stream,
+                                               buffer, count, blocking,
+                                               cancellable, error);
+       }
+       return nread;
+}
+
+static gssize
+soup_content_sniffer_stream_read (GInputStream  *stream,
+                                 void          *buffer,
+                                 gsize          count,
+                                 GCancellable  *cancellable,
+                                 GError       **error)
+{
+       return read_internal (stream, buffer, count, TRUE,
+                             cancellable, error);
+}
+
+static gssize
+soup_content_sniffer_stream_skip (GInputStream  *stream,
+                                 gsize          count,
+                                 GCancellable  *cancellable,
+                                 GError       **error)
+{
+       SoupContentSnifferStream *sniffer = SOUP_CONTENT_SNIFFER_STREAM (stream);
+       gssize nskipped;
+
+       if (sniffer->priv->sniffing) {
+               /* Read into the internal buffer... */
+               nskipped = soup_content_sniffer_stream_read (stream, NULL, 0, cancellable, error);
+               if (nskipped == -1)
+                       return -1;
+               /* Now fall through */
+       }
+
+       if (sniffer->priv->buffer) {
+               nskipped = MIN (count, sniffer->priv->buffer_nread);
+               if (nskipped == sniffer->priv->buffer_nread) {
+                       g_free (sniffer->priv->buffer);
+                       sniffer->priv->buffer = NULL;
+               } else {
+                       /* FIXME */
+                       memmove (sniffer->priv->buffer,
+                                sniffer->priv->buffer + nskipped,
+                                sniffer->priv->buffer_nread - nskipped);
+                       sniffer->priv->buffer_nread -= nskipped;
+               }
+       } else {
+               nskipped = G_INPUT_STREAM_CLASS (soup_content_sniffer_stream_parent_class)->
+                       skip (stream, count, cancellable, error);
+       }
+       return nskipped;
+}
+
+static gboolean
+soup_content_sniffer_stream_is_readable (GPollableInputStream *stream)
+{
+       SoupContentSnifferStream *sniffer = SOUP_CONTENT_SNIFFER_STREAM (stream);
+
+       if (sniffer->priv->error ||
+           (!sniffer->priv->sniffing && sniffer->priv->buffer))
+               return TRUE;
+
+       return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (stream)->base_stream));
+}
+
+static gssize
+soup_content_sniffer_stream_read_nonblocking (GPollableInputStream  *stream,
+                                             void                  *buffer,
+                                             gsize                  count,
+                                             GError               **error)
+{
+       return read_internal (G_INPUT_STREAM (stream), buffer, count,
+                             FALSE, NULL, error);
+}
+
+static GSource *
+soup_content_sniffer_stream_create_source (GPollableInputStream *stream,
+                                          GCancellable         *cancellable)
+{
+       SoupContentSnifferStream *sniffer = SOUP_CONTENT_SNIFFER_STREAM (stream);
+       GSource *base_source, *pollable_source;
+
+       if (sniffer->priv->error ||
+           (!sniffer->priv->sniffing && sniffer->priv->buffer))
+               base_source = g_timeout_source_new (0);
+       else
+               base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (stream)->base_stream), cancellable);
+
+       g_source_set_dummy_callback (base_source);
+       pollable_source = g_pollable_source_new (G_OBJECT (stream));
+       g_source_add_child_source (pollable_source, base_source);
+       g_source_unref (base_source);
+
+       return pollable_source;
+}
+
+static void
+soup_content_sniffer_stream_init (SoupContentSnifferStream *sniffer)
+{
+       sniffer->priv = G_TYPE_INSTANCE_GET_PRIVATE (sniffer,
+                                                    SOUP_TYPE_CONTENT_SNIFFER_STREAM,
+                                                    SoupContentSnifferStreamPrivate);
+       sniffer->priv->sniffing = TRUE;
+}
+
+static void
+soup_content_sniffer_stream_class_init (SoupContentSnifferStreamClass *sniffer_class)
+{
+       GObjectClass *object_class = G_OBJECT_CLASS (sniffer_class);
+       GInputStreamClass *input_stream_class =
+               G_INPUT_STREAM_CLASS (sniffer_class);
+       g_type_class_add_private (sniffer_class, sizeof (SoupContentSnifferStreamPrivate));
+
+       object_class->finalize = soup_content_sniffer_stream_finalize;
+       object_class->set_property = soup_content_sniffer_stream_set_property;
+       object_class->get_property = soup_content_sniffer_stream_get_property;
+
+       input_stream_class->read_fn = soup_content_sniffer_stream_read;
+       input_stream_class->skip = soup_content_sniffer_stream_skip;
+
+       g_object_class_install_property (
+               object_class, PROP_SNIFFER,
+               g_param_spec_object ("sniffer",
+                                    "Sniffer",
+                                    "The stream's SoupContentSniffer",
+                                    SOUP_TYPE_CONTENT_SNIFFER,
+                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+       g_object_class_install_property (
+               object_class, PROP_MESSAGE,
+               g_param_spec_object ("message",
+                                    "Message",
+                                    "The stream's SoupMessage",
+                                    SOUP_TYPE_MESSAGE,
+                                    G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_content_sniffer_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+                                          gpointer                       interface_data)
+{
+       pollable_interface->is_readable = soup_content_sniffer_stream_is_readable;
+       pollable_interface->read_nonblocking = soup_content_sniffer_stream_read_nonblocking;
+       pollable_interface->create_source = soup_content_sniffer_stream_create_source;
+}
+
+GInputStream *
+soup_content_sniffer_stream_new (SoupContentSniffer *sniffer,
+                                SoupMessage        *msg,
+                                GInputStream       *base_stream)
+{
+       return g_object_new (SOUP_TYPE_CONTENT_SNIFFER_STREAM,
+                            "base-stream", base_stream,
+                            "message", msg,
+                            "sniffer", sniffer,
+                            NULL);
+}
+
+gboolean
+soup_content_sniffer_stream_is_ready (SoupContentSnifferStream  *sniffer,
+                                     gboolean                   blocking,
+                                     GCancellable              *cancellable,
+                                     GError                   **error)
+{
+       if (!sniffer->priv->sniffing)
+               return TRUE;
+
+       return read_and_sniff (G_INPUT_STREAM (sniffer), blocking,
+                              cancellable, error) != -1;
+}
+
+const char *
+soup_content_sniffer_stream_sniff (SoupContentSnifferStream  *sniffer,
+                                  GHashTable               **params)
+{
+       if (params)
+               *params = sniffer->priv->sniffed_params;
+       return sniffer->priv->sniffed_type;
+}
diff --git a/libsoup/soup-content-sniffer-stream.h b/libsoup/soup-content-sniffer-stream.h
new file mode 100644 (file)
index 0000000..fb4889c
--- /dev/null
@@ -0,0 +1,53 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ */
+
+#ifndef SOUP_CONTENT_SNIFFER_STREAM_H
+#define SOUP_CONTENT_SNIFFER_STREAM_H 1
+
+#include <libsoup/soup-types.h>
+#include <libsoup/soup-content-sniffer.h>
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CONTENT_SNIFFER_STREAM         (soup_content_sniffer_stream_get_type ())
+#define SOUP_CONTENT_SNIFFER_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), SOUP_TYPE_CONTENT_SNIFFER_STREAM, SoupContentSnifferStream))
+#define SOUP_CONTENT_SNIFFER_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), SOUP_TYPE_CONTENT_SNIFFER_STREAM, SoupContentSnifferStreamClass))
+#define SOUP_IS_CONTENT_SNIFFER_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), SOUP_TYPE_CONTENT_SNIFFER_STREAM))
+#define SOUP_IS_CONTENT_SNIFFER_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), SOUP_TYPE_CONTENT_SNIFFER_STREAM))
+#define SOUP_CONTENT_SNIFFER_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SOUP_TYPE_CONTENT_SNIFFER_STREAM, SoupContentSnifferStreamClass))
+
+typedef struct _SoupContentSnifferStream        SoupContentSnifferStream;
+typedef struct _SoupContentSnifferStreamPrivate SoupContentSnifferStreamPrivate;
+typedef struct _SoupContentSnifferStreamClass   SoupContentSnifferStreamClass;
+
+struct _SoupContentSnifferStream {
+       GFilterInputStream parent_instance;
+
+       /*< private >*/
+       SoupContentSnifferStreamPrivate *priv;
+};
+
+struct _SoupContentSnifferStreamClass {
+       GFilterInputStreamClass parent_class;
+
+};
+
+GType soup_content_sniffer_stream_get_type (void) G_GNUC_CONST;
+
+GInputStream *soup_content_sniffer_stream_new      (SoupContentSniffer        *sniffer,
+                                                   SoupMessage               *msg,
+                                                   GInputStream              *base_stream);
+
+gboolean      soup_content_sniffer_stream_is_ready (SoupContentSnifferStream  *sniffer,
+                                                   gboolean                   blocking,
+                                                   GCancellable              *cancellable,
+                                                   GError                   **error);
+const char   *soup_content_sniffer_stream_sniff    (SoupContentSnifferStream  *sniffer,
+                                                   GHashTable               **params);
+
+
+G_END_DECLS
+
+#endif /* SOUP_CONTENT_SNIFFER_STREAM_H */
index 4b96735..67ff352 100644 (file)
@@ -106,6 +106,14 @@ soup_content_sniffer_sniff (SoupContentSniffer *sniffer,
        return SOUP_CONTENT_SNIFFER_GET_CLASS (sniffer)->sniff (sniffer, msg, buffer, params);
 }
 
+gsize
+soup_content_sniffer_get_buffer_size (SoupContentSniffer *sniffer)
+{
+       g_return_val_if_fail (SOUP_IS_CONTENT_SNIFFER (sniffer), 0);
+
+       return SOUP_CONTENT_SNIFFER_GET_CLASS (sniffer)->get_buffer_size (sniffer);
+}
+
 /* This table is based on the HTML5 spec;
  * See 2.7.4 Content-Type sniffing: unknown type
  */
@@ -548,9 +556,8 @@ static void
 soup_content_sniffer_got_headers_cb (SoupMessage *msg, SoupContentSniffer *sniffer)
 {
        SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
-       SoupContentSnifferClass *content_sniffer_class = SOUP_CONTENT_SNIFFER_GET_CLASS (sniffer);
 
-       priv->bytes_for_sniffing = content_sniffer_class->get_buffer_size (sniffer);
+       priv->bytes_for_sniffing = soup_content_sniffer_get_buffer_size (sniffer);
 }
 
 static void
index a8aa915..59c0154 100644 (file)
@@ -43,14 +43,15 @@ typedef struct {
        void (*_libsoup_reserved5) (void);
 } SoupContentSnifferClass;
 
-GType               soup_content_sniffer_get_type (void);
+GType               soup_content_sniffer_get_type        (void);
 
-SoupContentSniffer *soup_content_sniffer_new      (void);
+SoupContentSniffer *soup_content_sniffer_new             (void);
 
-char               *soup_content_sniffer_sniff    (SoupContentSniffer *sniffer,
-                                                  SoupMessage *msg,
-                                                  SoupBuffer *buffer,
-                                                  GHashTable **params);
+char               *soup_content_sniffer_sniff           (SoupContentSniffer  *sniffer,
+                                                         SoupMessage         *msg,
+                                                         SoupBuffer          *buffer,
+                                                         GHashTable         **params);
+gsize               soup_content_sniffer_get_buffer_size (SoupContentSniffer  *sniffer);
 
 G_END_DECLS
 
index 6e66364..e71a8ad 100644 (file)
@@ -15,6 +15,7 @@
 #include "soup-body-input-stream.h"
 #include "soup-body-output-stream.h"
 #include "soup-connection.h"
+#include "soup-content-sniffer-stream.h"
 #include "soup-converter-wrapper.h"
 #include "soup-filter-input-stream.h"
 #include "soup-message.h"
@@ -64,9 +65,6 @@ typedef struct {
        SoupMessageBody      *read_body;
        goffset               read_length;
 
-       gboolean              need_content_sniffed, need_got_chunk;
-       SoupMessageBody      *sniff_data;
-
        SoupMessageIOState    write_state;
        SoupEncoding          write_encoding;
        GString              *write_buf;
@@ -88,14 +86,6 @@ typedef struct {
 } SoupMessageIOData;
        
 
-/* Put these around callback invocation if there is code afterward
- * that depends on the IO having not been cancelled.
- */
-#define dummy_to_make_emacs_happy {
-#define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg);
-#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; }
-#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; }
-
 #define RESPONSE_BLOCK_SIZE 8192
 
 void
@@ -132,9 +122,6 @@ soup_message_io_cleanup (SoupMessage *msg)
        if (io->write_chunk)
                soup_buffer_free (io->write_chunk);
 
-       if (io->sniff_data)
-               soup_message_body_free (io->sniff_data);
-
        g_slice_free (SoupMessageIOData, io);
 }
 
@@ -211,55 +198,6 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
 }
 
 static gboolean
-io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
-{
-       SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
-       SoupMessageIOData *io = priv->io_data;
-       SoupBuffer *sniffed_buffer;
-       char *sniffed_mime_type;
-       GHashTable *params = NULL;
-
-       if (!priv->sniffer)
-               return TRUE;
-
-       if (!io->sniff_data) {
-               io->sniff_data = soup_message_body_new ();
-               io->need_content_sniffed = TRUE;
-       }
-
-       if (io->need_content_sniffed) {
-               if (io->sniff_data->length < priv->bytes_for_sniffing &&
-                   !done_reading)
-                       return TRUE;
-
-               io->need_content_sniffed = FALSE;
-               sniffed_buffer = soup_message_body_flatten (io->sniff_data);
-               sniffed_mime_type = soup_content_sniffer_sniff (priv->sniffer, msg, sniffed_buffer, &params);
-
-               SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-               soup_message_content_sniffed (msg, sniffed_mime_type, params);
-               g_free (sniffed_mime_type);
-               if (params)
-                       g_hash_table_destroy (params);
-               if (sniffed_buffer)
-                       soup_buffer_free (sniffed_buffer);
-               SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
-       }
-
-       if (io->need_got_chunk) {
-               io->need_got_chunk = FALSE;
-               sniffed_buffer = soup_message_body_flatten (io->sniff_data);
-
-               SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-               soup_message_got_chunk (msg, sniffed_buffer);
-               soup_buffer_free (sniffed_buffer);
-               SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
-       }
-
-       return TRUE;
-}
-
-static gboolean
 read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
 {
        SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
@@ -333,6 +271,13 @@ setup_body_istream (SoupMessage *msg)
                g_object_unref (io->body_istream);
                io->body_istream = filter;
        }
+
+       if (priv->sniffer) {
+               filter = soup_content_sniffer_stream_new (priv->sniffer,
+                                                         msg, io->body_istream);
+               g_object_unref (io->body_istream);
+               io->body_istream = filter;
+       }
 }
 
 /*
@@ -408,7 +353,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
                                /* Stop and wait for the body now */
                                io->write_state =
                                        SOUP_MESSAGE_IO_STATE_BLOCKING;
-                               io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
+                               io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
                        } else {
                                /* We just wrote a 1xx response
                                 * header, so stay in STATE_HEADERS.
@@ -626,7 +571,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
                        io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
                        io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
                } else {
-                       io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
+                       io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
 
                        /* If the client was waiting for a Continue
                         * but got something else, then it's done
@@ -659,13 +604,27 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
                break;
 
 
-       case SOUP_MESSAGE_IO_STATE_BODY:
+       case SOUP_MESSAGE_IO_STATE_BODY_START:
                if (!io->body_istream)
                        setup_body_istream (msg);
 
-               if (!io_handle_sniffing (msg, FALSE))
-                       return FALSE;
+               if (priv->sniffer) {
+                       SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM (io->body_istream);
+                       const char *content_type;
+                       GHashTable *params;
+
+                       if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, error))
+                               return FALSE;
+
+                       content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
+                       soup_message_content_sniffed (msg, content_type, params);
+               }
 
+               io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
+               break;
+
+
+       case SOUP_MESSAGE_IO_STATE_BODY:
                if (priv->chunk_allocator) {
                        buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
                        if (!buffer) {
@@ -688,16 +647,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
                if (nread > 0) {
                        buffer->length = nread;
                        soup_message_body_got_chunk (io->read_body, buffer);
-
-                       if (io->need_content_sniffed) {
-                               soup_message_body_append_buffer (io->sniff_data, buffer);
-                               soup_buffer_free (buffer);
-                               io->need_got_chunk = TRUE;
-                               if (!io_handle_sniffing (msg, FALSE))
-                                       return FALSE;
-                               break;
-                       }
-
                        soup_message_got_chunk (msg, buffer);
                        soup_buffer_free (buffer);
                        break;
@@ -713,9 +662,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
 
 
        case SOUP_MESSAGE_IO_STATE_BODY_DONE:
-               if (!io_handle_sniffing (msg, TRUE))
-                       return FALSE;
-
                io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
                soup_message_got_body (msg);
                break;