souphttpsrc: Handle non-UTF8 headers and error reasons more gracefully
[platform/upstream/gst-plugins-good.git] / ext / soup / gstsouphttpsrc.c
index 0bd51a1..fc7cba7 100644 (file)
@@ -77,7 +77,6 @@
 #endif
 #include <gst/gstelement.h>
 #include <gst/gst-i18n-plugin.h>
-#include <gio/gio.h>
 #include <libsoup/soup.h>
 #include "gstsouphttpsrc.h"
 #include "gstsouputils.h"
@@ -134,6 +133,13 @@ enum
 #define DEFAULT_RETRIES              3
 #define DEFAULT_SOUP_METHOD          NULL
 
+#define GROW_BLOCKSIZE_LIMIT 1
+#define GROW_BLOCKSIZE_COUNT 1
+#define GROW_BLOCKSIZE_FACTOR 2
+#define REDUCE_BLOCKSIZE_LIMIT 0.20
+#define REDUCE_BLOCKSIZE_COUNT 2
+#define REDUCE_BLOCKSIZE_FACTOR 0.5
+
 static void gst_soup_http_src_uri_handler_init (gpointer g_iface,
     gpointer iface_data);
 static void gst_soup_http_src_finalize (GObject * gobject);
@@ -176,7 +182,6 @@ static void gst_soup_http_src_got_headers (GstSoupHTTPSrc * src,
 static void gst_soup_http_src_authenticate_cb (SoupSession * session,
     SoupMessage * msg, SoupAuth * auth, gboolean retrying,
     GstSoupHTTPSrc * src);
-static void gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src);
 
 #define gst_soup_http_src_parent_class parent_class
 G_DEFINE_TYPE_WITH_CODE (GstSoupHTTPSrc, gst_soup_http_src, GST_TYPE_PUSH_SRC,
@@ -439,9 +444,15 @@ gst_soup_http_src_reset (GstSoupHTTPSrc * src)
   src->content_size = 0;
   src->have_body = FALSE;
 
+  src->reduce_blocksize_count = 0;
+  src->increase_blocksize_count = 0;
+
   src->ret = GST_FLOW_OK;
   g_cancellable_reset (src->cancellable);
-  gst_soup_http_src_destroy_input_stream (src);
+  if (src->input_stream) {
+    g_object_unref (src->input_stream);
+    src->input_stream = NULL;
+  }
 
   gst_caps_replace (&src->src_caps, NULL);
   g_free (src->iradio_name);
@@ -460,7 +471,6 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
   g_mutex_init (&src->mutex);
   g_cond_init (&src->have_headers_cond);
   src->cancellable = g_cancellable_new ();
-  src->poll_context = g_main_context_new ();
   src->location = NULL;
   src->redirection_uri = NULL;
   src->automatic_redirect = TRUE;
@@ -481,6 +491,7 @@ gst_soup_http_src_init (GstSoupHTTPSrc * src)
   src->tls_interaction = DEFAULT_TLS_INTERACTION;
   src->max_retries = DEFAULT_RETRIES;
   src->method = DEFAULT_SOUP_METHOD;
+  src->minimum_blocksize = gst_base_src_get_blocksize (GST_BASE_SRC_CAST (src));
   proxy = g_getenv ("http_proxy");
   if (!gst_soup_http_src_set_proxy (src, proxy)) {
     GST_WARNING_OBJECT (src,
@@ -515,7 +526,6 @@ gst_soup_http_src_finalize (GObject * gobject)
   g_mutex_clear (&src->mutex);
   g_cond_clear (&src->have_headers_cond);
   g_object_unref (src->cancellable);
-  g_main_context_unref (src->poll_context);
   g_free (src->location);
   g_free (src->redirection_uri);
   g_free (src->user_agent);
@@ -766,21 +776,6 @@ gst_soup_http_src_unicodify (const gchar * str)
 }
 
 static void
-gst_soup_http_src_destroy_input_stream (GstSoupHTTPSrc * src)
-{
-  if (src->input_stream) {
-    if (src->poll_source) {
-      g_source_destroy (src->poll_source);
-      g_source_unref (src->poll_source);
-      src->poll_source = NULL;
-    }
-    g_input_stream_close (src->input_stream, src->cancellable, NULL);
-    g_object_unref (src->input_stream);
-    src->input_stream = NULL;
-  }
-}
-
-static void
 gst_soup_http_src_cancel_message (GstSoupHTTPSrc * src)
 {
   g_cancellable_cancel (src->cancellable);
@@ -957,11 +952,16 @@ gst_soup_http_src_session_close (GstSoupHTTPSrc * src)
   GST_DEBUG_OBJECT (src, "Closing session");
 
   g_mutex_lock (&src->mutex);
+  if (src->msg) {
+    soup_session_cancel_message (src->session, src->msg, SOUP_STATUS_CANCELLED);
+    g_object_unref (src->msg);
+    src->msg = NULL;
+  }
+
   if (src->session) {
-    soup_session_abort (src->session);  /* This unrefs the message. */
+    soup_session_abort (src->session);
     g_object_unref (src->session);
     src->session = NULL;
-    src->msg = NULL;
   }
   g_mutex_unlock (&src->mutex);
 }
@@ -988,6 +988,9 @@ insert_http_header (const gchar * name, const gchar * value, gpointer user_data)
   GstStructure *headers = user_data;
   const GValue *gv;
 
+  if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
+    return;
+
   gv = gst_structure_get_value (headers, name);
   if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
     GValue v = G_VALUE_INIT;
@@ -1035,13 +1038,30 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
     return;
 
   if (src->automatic_redirect && SOUP_STATUS_IS_REDIRECTION (msg->status_code)) {
-    src->redirection_uri = g_strdup (soup_message_headers_get_one
-        (msg->response_headers, "Location"));
-    src->redirection_permanent =
-        (msg->status_code == SOUP_STATUS_MOVED_PERMANENTLY);
-    GST_DEBUG_OBJECT (src, "%u redirect to \"%s\" (permanent %d)",
-        msg->status_code, src->redirection_uri, src->redirection_permanent);
-    return;
+    const gchar *location;
+
+    location = soup_message_headers_get_one (msg->response_headers, "Location");
+
+    if (location) {
+      if (!g_utf8_validate (location, -1, NULL)) {
+        GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, SEEK,
+            (_("Corrupted HTTP response.")),
+            ("Location header is not valid UTF-8"),
+            ("http-status-code", G_TYPE_UINT, msg->status_code,
+                "http-redirection-uri", G_TYPE_STRING,
+                GST_STR_NULL (src->redirection_uri), NULL));
+        src->ret = GST_FLOW_ERROR;
+        return;
+      }
+
+      src->redirection_uri = g_strdup (location);
+
+      src->redirection_permanent =
+          (msg->status_code == SOUP_STATUS_MOVED_PERMANENTLY);
+      GST_DEBUG_OBJECT (src, "%u redirect to \"%s\" (permanent %d)",
+          msg->status_code, src->redirection_uri, src->redirection_permanent);
+      return;
+    }
   }
 
   if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
@@ -1110,46 +1130,70 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
   if ((value =
           soup_message_headers_get_one (msg->response_headers,
               "icy-metaint")) != NULL) {
-    gint icy_metaint = atoi (value);
+    gint icy_metaint;
 
-    GST_DEBUG_OBJECT (src, "icy-metaint: %s (parsed: %d)", value, icy_metaint);
-    if (icy_metaint > 0) {
-      if (src->src_caps)
-        gst_caps_unref (src->src_caps);
+    if (g_utf8_validate (value, -1, NULL)) {
+      icy_metaint = atoi (value);
+
+      GST_DEBUG_OBJECT (src, "icy-metaint: %s (parsed: %d)", value,
+          icy_metaint);
+      if (icy_metaint > 0) {
+        if (src->src_caps)
+          gst_caps_unref (src->src_caps);
 
-      src->src_caps = gst_caps_new_simple ("application/x-icy",
-          "metadata-interval", G_TYPE_INT, icy_metaint, NULL);
+        src->src_caps = gst_caps_new_simple ("application/x-icy",
+            "metadata-interval", G_TYPE_INT, icy_metaint, NULL);
 
-      gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+        gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+      }
     }
   }
   if ((value =
           soup_message_headers_get_content_type (msg->response_headers,
               &params)) != NULL) {
-    GST_DEBUG_OBJECT (src, "Content-Type: %s", value);
-    if (g_ascii_strcasecmp (value, "audio/L16") == 0) {
+    if (!g_utf8_validate (value, -1, NULL)) {
+      GST_WARNING_OBJECT (src, "Content-Type is invalid UTF-8");
+    } else if (g_ascii_strcasecmp (value, "audio/L16") == 0) {
       gint channels = 2;
       gint rate = 44100;
       char *param;
 
-      if (src->src_caps)
+      GST_DEBUG_OBJECT (src, "Content-Type: %s", value);
+
+      if (src->src_caps) {
         gst_caps_unref (src->src_caps);
+        src->src_caps = NULL;
+      }
 
       param = g_hash_table_lookup (params, "channels");
-      if (param != NULL)
-        channels = atol (param);
+      if (param != NULL) {
+        guint64 val = g_ascii_strtoull (param, NULL, 10);
+        if (val < 64)
+          channels = val;
+        else
+          channels = 0;
+      }
 
       param = g_hash_table_lookup (params, "rate");
-      if (param != NULL)
-        rate = atol (param);
+      if (param != NULL) {
+        guint64 val = g_ascii_strtoull (param, NULL, 10);
+        if (val < G_MAXINT)
+          rate = val;
+        else
+          rate = 0;
+      }
 
-      src->src_caps = gst_caps_new_simple ("audio/x-unaligned-raw",
-          "format", G_TYPE_STRING, "S16BE",
-          "layout", G_TYPE_STRING, "interleaved",
-          "channels", G_TYPE_INT, channels, "rate", G_TYPE_INT, rate, NULL);
+      if (rate > 0 && channels > 0) {
+        src->src_caps = gst_caps_new_simple ("audio/x-unaligned-raw",
+            "format", G_TYPE_STRING, "S16BE",
+            "layout", G_TYPE_STRING, "interleaved",
+            "channels", G_TYPE_INT, channels, "rate", G_TYPE_INT, rate, NULL);
 
-      gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+        gst_base_src_set_caps (GST_BASE_SRC (src), src->src_caps);
+      }
     } else {
+      GST_DEBUG_OBJECT (src, "Content-Type: %s", value);
+
       /* Set the Content-Type field on the caps */
       if (src->src_caps) {
         src->src_caps = gst_caps_make_writable (src->src_caps);
@@ -1166,30 +1210,36 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
   if ((value =
           soup_message_headers_get_one (msg->response_headers,
               "icy-name")) != NULL) {
-    g_free (src->iradio_name);
-    src->iradio_name = gst_soup_http_src_unicodify (value);
-    if (src->iradio_name) {
-      gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_ORGANIZATION,
-          src->iradio_name, NULL);
+    if (g_utf8_validate (value, -1, NULL)) {
+      g_free (src->iradio_name);
+      src->iradio_name = gst_soup_http_src_unicodify (value);
+      if (src->iradio_name) {
+        gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_ORGANIZATION,
+            src->iradio_name, NULL);
+      }
     }
   }
   if ((value =
           soup_message_headers_get_one (msg->response_headers,
               "icy-genre")) != NULL) {
-    g_free (src->iradio_genre);
-    src->iradio_genre = gst_soup_http_src_unicodify (value);
-    if (src->iradio_genre) {
-      gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_GENRE,
-          src->iradio_genre, NULL);
+    if (g_utf8_validate (value, -1, NULL)) {
+      g_free (src->iradio_genre);
+      src->iradio_genre = gst_soup_http_src_unicodify (value);
+      if (src->iradio_genre) {
+        gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_GENRE,
+            src->iradio_genre, NULL);
+      }
     }
   }
   if ((value = soup_message_headers_get_one (msg->response_headers, "icy-url"))
       != NULL) {
-    g_free (src->iradio_url);
-    src->iradio_url = gst_soup_http_src_unicodify (value);
-    if (src->iradio_url) {
-      gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_LOCATION,
-          src->iradio_url, NULL);
+    if (g_utf8_validate (value, -1, NULL)) {
+      g_free (src->iradio_url);
+      src->iradio_url = gst_soup_http_src_unicodify (value);
+      if (src->iradio_url) {
+        gst_tag_list_add (tag_list, GST_TAG_MERGE_REPLACE, GST_TAG_LOCATION,
+            src->iradio_url, NULL);
+      }
     }
   }
   if (!gst_tag_list_is_empty (tag_list)) {
@@ -1207,10 +1257,13 @@ gst_soup_http_src_got_headers (GstSoupHTTPSrc * src, SoupMessage * msg)
   if (src->ret == GST_FLOW_CUSTOM_ERROR &&
       src->read_position && msg->status_code != SOUP_STATUS_PARTIAL_CONTENT) {
     src->seekable = FALSE;
-    GST_ELEMENT_ERROR (src, RESOURCE, SEEK,
+    GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, SEEK,
         (_("Server does not support seeking.")),
         ("Server does not accept Range HTTP header, URL: %s, Redirect to: %s",
-            src->location, GST_STR_NULL (src->redirection_uri)));
+            src->location, GST_STR_NULL (src->redirection_uri)),
+        ("http-status-code", G_TYPE_UINT, msg->status_code,
+            "http-redirection-uri", G_TYPE_STRING,
+            GST_STR_NULL (src->redirection_uri), NULL));
     src->ret = GST_FLOW_ERROR;
   }
 
@@ -1239,9 +1292,13 @@ gst_soup_http_src_alloc_buffer (GstSoupHTTPSrc * src)
 }
 
 #define SOUP_HTTP_SRC_ERROR(src,soup_msg,cat,code,error_message)     \
-  GST_ELEMENT_ERROR ((src), cat, code, ("%s", error_message),        \
-      ("%s (%d), URL: %s, Redirect to: %s", (soup_msg)->reason_phrase,                \
-          (soup_msg)->status_code, (src)->location, GST_STR_NULL ((src)->redirection_uri)));
+  do { \
+    GST_ELEMENT_ERROR_WITH_DETAILS ((src), cat, code, ("%s", error_message), \
+        ("%s (%d), URL: %s, Redirect to: %s", (soup_msg)->reason_phrase, \
+            (soup_msg)->status_code, (src)->location, GST_STR_NULL ((src)->redirection_uri)), \
+            ("http-status-code", G_TYPE_UINT, msg->status_code, \
+             "http-redirect-uri", G_TYPE_STRING, GST_STR_NULL ((src)->redirection_uri), NULL)); \
+  } while(0)
 
 static void
 gst_soup_http_src_parse_status (SoupMessage * msg, GstSoupHTTPSrc * src)
@@ -1291,6 +1348,14 @@ gst_soup_http_src_parse_status (SoupMessage * msg, GstSoupHTTPSrc * src)
   } else if (SOUP_STATUS_IS_CLIENT_ERROR (msg->status_code) ||
       SOUP_STATUS_IS_REDIRECTION (msg->status_code) ||
       SOUP_STATUS_IS_SERVER_ERROR (msg->status_code)) {
+    const gchar *reason_phrase;
+
+    reason_phrase = msg->reason_phrase;
+    if (reason_phrase && !g_utf8_validate (reason_phrase, -1, NULL)) {
+      GST_ERROR_OBJECT (src, "Invalid UTF-8 in reason");
+      reason_phrase = "(invalid)";
+    }
+
     /* Report HTTP error. */
 
     /* when content_size is unknown and we have just finished receiving
@@ -1308,25 +1373,33 @@ gst_soup_http_src_parse_status (SoupMessage * msg, GstSoupHTTPSrc * src)
      * error dialog according to libsoup documentation.
      */
     if (msg->status_code == SOUP_STATUS_NOT_FOUND) {
-      GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
-          ("%s", msg->reason_phrase),
-          ("%s (%d), URL: %s, Redirect to: %s", msg->reason_phrase,
+      GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, NOT_FOUND,
+          ("%s", reason_phrase),
+          ("%s (%d), URL: %s, Redirect to: %s", reason_phrase,
               msg->status_code, src->location,
-              GST_STR_NULL (src->redirection_uri)));
+              GST_STR_NULL (src->redirection_uri)),
+          ("http-status-code", G_TYPE_UINT, msg->status_code,
+              "http-redirect-uri", G_TYPE_STRING,
+              GST_STR_NULL (src->redirection_uri), NULL));
     } else if (msg->status_code == SOUP_STATUS_UNAUTHORIZED
         || msg->status_code == SOUP_STATUS_PAYMENT_REQUIRED
         || msg->status_code == SOUP_STATUS_FORBIDDEN
         || msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
-      GST_ELEMENT_ERROR (src, RESOURCE, NOT_AUTHORIZED, ("%s",
-              msg->reason_phrase), ("%s (%d), URL: %s, Redirect to: %s",
-              msg->reason_phrase, msg->status_code, src->location,
-              GST_STR_NULL (src->redirection_uri)));
+      GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, NOT_AUTHORIZED, ("%s",
+              reason_phrase), ("%s (%d), URL: %s, Redirect to: %s",
+              reason_phrase, msg->status_code, src->location,
+              GST_STR_NULL (src->redirection_uri)), ("http-status-code",
+              G_TYPE_UINT, msg->status_code, "http-redirect-uri", G_TYPE_STRING,
+              GST_STR_NULL (src->redirection_uri), NULL));
     } else {
-      GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ,
-          ("%s", msg->reason_phrase),
-          ("%s (%d), URL: %s, Redirect to: %s", msg->reason_phrase,
+      GST_ELEMENT_ERROR_WITH_DETAILS (src, RESOURCE, OPEN_READ,
+          ("%s", reason_phrase),
+          ("%s (%d), URL: %s, Redirect to: %s", reason_phrase,
               msg->status_code, src->location,
-              GST_STR_NULL (src->redirection_uri)));
+              GST_STR_NULL (src->redirection_uri)),
+          ("http-status-code", G_TYPE_UINT, msg->status_code,
+              "http-redirect-uri", G_TYPE_STRING,
+              GST_STR_NULL (src->redirection_uri), NULL));
     }
     src->ret = GST_FLOW_ERROR;
   }
@@ -1370,25 +1443,11 @@ gst_soup_http_src_build_message (GstSoupHTTPSrc * src, const gchar * method)
   return TRUE;
 }
 
-static void
-gst_soup_http_src_check_input_stream_interfaces (GstSoupHTTPSrc * src)
-{
-  if (!src->input_stream)
-    return;
-
-  src->has_pollable_interface = G_IS_POLLABLE_INPUT_STREAM (src->input_stream)
-      && g_pollable_input_stream_can_poll ((GPollableInputStream *)
-      src->input_stream);
-}
-
 static GstFlowReturn
 gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
 {
   g_return_val_if_fail (src->msg != NULL, GST_FLOW_ERROR);
 
-  g_assert (src->input_stream == NULL);
-  g_assert (src->poll_source == NULL);
-
   /* FIXME We are ignoring the GError here, might be useful to debug */
   src->input_stream =
       soup_session_send (src->session, src->msg, src->cancellable, NULL);
@@ -1413,8 +1472,6 @@ gst_soup_http_src_send_message (GstSoupHTTPSrc * src)
     return GST_FLOW_ERROR;
   }
 
-  gst_soup_http_src_check_input_stream_interfaces (src);
-
   return GST_FLOW_OK;
 }
 
@@ -1457,6 +1514,46 @@ done:
   return src->ret;
 }
 
+/*
+ * Check if the bytes_read is above a certain threshold of the blocksize, if
+ * that happens a few times in a row, increase the blocksize; Do the same in
+ * the opposite direction to reduce the blocksize.
+ */
+static void
+gst_soup_http_src_check_update_blocksize (GstSoupHTTPSrc * src,
+    gint64 bytes_read)
+{
+  guint blocksize = gst_base_src_get_blocksize (GST_BASE_SRC_CAST (src));
+
+  GST_LOG_OBJECT (src, "Checking to update blocksize. Read:%" G_GINT64_FORMAT
+      " blocksize:%u", bytes_read, blocksize);
+
+  if (bytes_read >= blocksize * GROW_BLOCKSIZE_LIMIT) {
+    src->reduce_blocksize_count = 0;
+    src->increase_blocksize_count++;
+
+    if (src->increase_blocksize_count >= GROW_BLOCKSIZE_COUNT) {
+      blocksize *= GROW_BLOCKSIZE_FACTOR;
+      GST_DEBUG_OBJECT (src, "Increased blocksize to %u", blocksize);
+      gst_base_src_set_blocksize (GST_BASE_SRC_CAST (src), blocksize);
+      src->increase_blocksize_count = 0;
+    }
+  } else if (bytes_read < blocksize * REDUCE_BLOCKSIZE_LIMIT) {
+    src->reduce_blocksize_count++;
+    src->increase_blocksize_count = 0;
+
+    if (src->reduce_blocksize_count >= REDUCE_BLOCKSIZE_COUNT) {
+      blocksize *= REDUCE_BLOCKSIZE_FACTOR;
+      blocksize = MAX (blocksize, src->minimum_blocksize);
+      GST_DEBUG_OBJECT (src, "Decreased blocksize to %u", blocksize);
+      gst_base_src_set_blocksize (GST_BASE_SRC_CAST (src), blocksize);
+      src->reduce_blocksize_count = 0;
+    }
+  } else {
+    src->reduce_blocksize_count = src->increase_blocksize_count = 0;
+  }
+}
+
 static void
 gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
 {
@@ -1483,38 +1580,6 @@ gst_soup_http_src_update_position (GstSoupHTTPSrc * src, gint64 bytes_read)
   }
 }
 
-static gboolean
-_gst_soup_http_src_data_available_callback (GObject * pollable_stream,
-    gpointer udata)
-{
-  GstSoupHTTPSrc *src = udata;
-
-  src->have_data = TRUE;
-  return TRUE;
-}
-
-/* Need to wait on a gsource to know when data is available */
-static gboolean
-gst_soup_http_src_wait_for_data (GstSoupHTTPSrc * src)
-{
-  src->have_data = FALSE;
-
-  if (!src->poll_source) {
-    src->poll_source =
-        g_pollable_input_stream_create_source ((GPollableInputStream *)
-        src->input_stream, src->cancellable);
-    g_source_set_callback (src->poll_source,
-        (GSourceFunc) _gst_soup_http_src_data_available_callback, src, NULL);
-    g_source_attach (src->poll_source, src->poll_context);
-  }
-
-  while (!src->have_data && !g_cancellable_is_cancelled (src->cancellable)) {
-    g_main_context_iteration (src->poll_context, TRUE);
-  }
-
-  return src->have_data;
-}
-
 static GstFlowReturn
 gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
 {
@@ -1522,7 +1587,6 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
   GstMapInfo mapinfo;
   GstBaseSrc *bsrc;
   GstFlowReturn ret;
-  GError *err = NULL;
 
   bsrc = GST_BASE_SRC_CAST (src);
 
@@ -1537,34 +1601,9 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
     return GST_FLOW_ERROR;
   }
 
-  if (src->has_pollable_interface) {
-    while (1) {
-      read_bytes =
-          g_pollable_input_stream_read_nonblocking ((GPollableInputStream *)
-          src->input_stream, mapinfo.data, mapinfo.size, src->cancellable,
-          &err);
-      if (read_bytes == -1) {
-        if (err && g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-          g_error_free (err);
-          err = NULL;
-
-          /* no data yet, wait */
-          if (gst_soup_http_src_wait_for_data (src))
-            /* retry */
-            continue;
-        }
-      }
-      break;
-    }
-  } else {
-    read_bytes =
-        g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
-        src->cancellable, NULL);
-  }
-
-  if (err)
-    g_error_free (err);
-
+  read_bytes =
+      g_input_stream_read (src->input_stream, mapinfo.data, mapinfo.size,
+      src->cancellable, NULL);
   GST_DEBUG_OBJECT (src, "Read %" G_GSSIZE_FORMAT " bytes from http input",
       read_bytes);
 
@@ -1575,7 +1614,6 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
     g_mutex_unlock (&src->mutex);
     return GST_FLOW_FLUSHING;
   }
-  g_mutex_unlock (&src->mutex);
 
   gst_buffer_unmap (*outbuf, &mapinfo);
   if (read_bytes > 0) {
@@ -1586,16 +1624,43 @@ gst_soup_http_src_read_buffer (GstSoupHTTPSrc * src, GstBuffer ** outbuf)
 
     /* Got some data, reset retry counter */
     src->retry_count = 0;
+
+    gst_soup_http_src_check_update_blocksize (src, read_bytes);
+
+    /* If we're at the end of a range request, read again to let libsoup
+     * finalize the request. This allows to reuse the connection again later,
+     * otherwise we would have to cancel the message and close the connection
+     */
+    if (bsrc->segment.stop != -1
+        && bsrc->segment.position + read_bytes >= bsrc->segment.stop) {
+      guint8 tmp[128];
+
+      g_object_unref (src->msg);
+      src->msg = NULL;
+      src->have_body = TRUE;
+
+      /* This should return immediately as we're at the end of the range */
+      read_bytes =
+          g_input_stream_read (src->input_stream, tmp, sizeof (tmp),
+          src->cancellable, NULL);
+      if (read_bytes > 0)
+        GST_ERROR_OBJECT (src,
+            "Read %" G_GSIZE_FORMAT " bytes after end of range", read_bytes);
+    }
   } else {
     gst_buffer_unref (*outbuf);
     if (read_bytes < 0) {
       /* Maybe the server disconnected, retry */
       ret = GST_FLOW_CUSTOM_ERROR;
     } else {
+      g_object_unref (src->msg);
+      src->msg = NULL;
       ret = GST_FLOW_EOS;
       src->have_body = TRUE;
     }
   }
+  g_mutex_unlock (&src->mutex);
+
   return ret;
 }
 
@@ -1613,7 +1678,11 @@ retry:
 
   /* Check for pending position change */
   if (src->request_position != src->read_position) {
-    gst_soup_http_src_destroy_input_stream (src);
+    if (src->input_stream) {
+      g_input_stream_close (src->input_stream, src->cancellable, NULL);
+      g_object_unref (src->input_stream);
+      src->input_stream = NULL;
+    }
   }
 
   if (g_cancellable_is_cancelled (src->cancellable)) {
@@ -1650,7 +1719,10 @@ done:
       gst_event_unref (http_headers_event);
 
     g_mutex_lock (&src->mutex);
-    gst_soup_http_src_destroy_input_stream (src);
+    if (src->input_stream) {
+      g_object_unref (src->input_stream);
+      src->input_stream = NULL;
+    }
     g_mutex_unlock (&src->mutex);
     if (ret == GST_FLOW_CUSTOM_ERROR)
       goto retry;