curlhttpsrc: add support for range GET
authorAlex Ashley <alex.ashley@youview.com>
Fri, 24 May 2019 16:33:33 +0000 (17:33 +0100)
committerTim-Philipp Müller <tim@centricular.com>
Sun, 17 Nov 2019 14:28:25 +0000 (14:28 +0000)
To allow curlhttpsrc to support DASH streams that use the on-demand
profile, it needs to support HTTP Range GETs. In GStreamer, the RANGE
is specified by issuing a GST_FORMAT_BYTES seek to set the start and
end of the range. curlhttpsrc needs to implement seek and set the
appropriate curl options to make it add the Range header to the
request.

ext/curl/gstcurlhttpsrc.c
ext/curl/gstcurlhttpsrc.h
tests/check/elements/curlhttpsrc.c

index 50557b0..9b41fa9 100644 (file)
@@ -193,6 +193,9 @@ static void gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src);
 static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
 static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
     guint64 * size);
+static gboolean gst_curl_http_src_is_seekable (GstBaseSrc * bsrc);
+static gboolean gst_curl_http_src_do_seek (GstBaseSrc * bsrc,
+    GstSegment * segment);
 static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
 static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
 
@@ -278,6 +281,9 @@ gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
   gstbasesrc_class->get_size =
       GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
+  gstbasesrc_class->is_seekable =
+      GST_DEBUG_FUNCPTR (gst_curl_http_src_is_seekable);
+  gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_curl_http_src_do_seek);
   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
   gstbasesrc_class->unlock_stop =
       GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
@@ -702,6 +708,10 @@ gst_curl_http_src_init (GstCurlHttpSrc * source)
   source->retries_remaining = source->total_retries;
   source->slist = NULL;
   source->accept_compressed_encodings = FALSE;
+  source->seekable = GSTCURL_SEEKABLE_UNKNOWN;
+  source->content_size = 0;
+  source->request_position = 0;
+  source->stop_position = -1;
 
   gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
 
@@ -1139,6 +1149,23 @@ gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
   gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
   gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
 
+  if (s->request_position || s->stop_position > 0) {
+    gchar *range;
+    if (s->stop_position < 1) {
+      /* start specified, no end specified */
+      range = g_strdup_printf ("%" G_GINT64_FORMAT "-", s->request_position);
+    } else {
+      /* in GStreamer the end position indicates the first byte that is not
+         in the range, whereas in HTTP the Content-Range header includes the
+         byte listed in the end value */
+      range = g_strdup_printf ("%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
+          s->request_position, s->stop_position - 1);
+    }
+    GST_TRACE_OBJECT (s, "Requesting range: %s", range);
+    curl_easy_setopt (handle, CURLOPT_RANGE, range);
+    g_free (range);
+  }
+
   switch (s->preferred_http_version) {
     case GSTCURL_HTTP_VERSION_1_0:
       GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
@@ -1192,6 +1219,7 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
 {
   glong curl_info_long;
   gdouble curl_info_dbl;
+  curl_off_t curl_info_offt;
   gchar *redirect_url;
   GstBaseSrc *basesrc;
   const GValue *response_headers;
@@ -1284,15 +1312,25 @@ gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
   /*
    * Push the content length
    */
-  if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD,
-          &curl_info_dbl) == CURLE_OK) {
-    if (curl_info_dbl == -1) {
+  if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
+          &curl_info_offt) == CURLE_OK) {
+    if (curl_info_offt == -1) {
       GST_WARNING_OBJECT (src,
           "No Content-Length was specified in the response.");
+      src->seekable = GSTCURL_SEEKABLE_FALSE;
     } else {
-      GST_INFO_OBJECT (src, "Content-Length was given as %.0f", curl_info_dbl);
+      /* Note that in the case of a range get, Content-Length is the number
+         of bytes requested, not the total size of the resource */
+      GST_INFO_OBJECT (src, "Content-Length was given as %" G_GUINT64_FORMAT,
+          curl_info_offt);
+      if (src->content_size == 0) {
+        src->content_size = src->request_position + curl_info_offt;
+      }
       basesrc = GST_BASE_SRC_CAST (src);
-      basesrc->segment.duration = curl_info_dbl;
+      basesrc->segment.duration = src->request_position + curl_info_offt;
+      if (src->seekable == GSTCURL_SEEKABLE_UNKNOWN) {
+        src->seekable = GSTCURL_SEEKABLE_TRUE;
+      }
       gst_element_post_message (GST_ELEMENT (src),
           gst_message_new_duration_changed (GST_OBJECT (src)));
     }
@@ -1527,6 +1565,59 @@ gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
   return ret;
 }
 
+static gboolean
+gst_curl_http_src_is_seekable (GstBaseSrc * bsrc)
+{
+  GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
+
+  /* NOTE: if seekable is UNKNOWN, assume yes */
+  return src->seekable != GSTCURL_SEEKABLE_FALSE;
+}
+
+static gboolean
+gst_curl_http_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
+{
+  GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
+  gboolean ret = TRUE;
+
+  g_mutex_lock (&src->buffer_mutex);
+  GST_DEBUG_OBJECT (src, "do_seek(%" G_GINT64_FORMAT ", %" G_GINT64_FORMAT
+      ")", segment->start, segment->stop);
+  if (src->state == GSTCURL_UNLOCK) {
+    GST_WARNING_OBJECT (src, "Attempt to seek while unlocked");
+    ret = FALSE;
+    goto done;
+  }
+  if (src->request_position == segment->start &&
+      src->stop_position == segment->stop) {
+    GST_DEBUG_OBJECT (src, "Seek to current read/end position");
+    goto done;
+  }
+
+  if (src->seekable == GSTCURL_SEEKABLE_FALSE) {
+    GST_WARNING_OBJECT (src, "Not seekable");
+    ret = FALSE;
+    goto done;
+  }
+
+  if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
+    GST_WARNING_OBJECT (src, "Invalid seek segment");
+    ret = FALSE;
+    goto done;
+  }
+
+  if (src->content_size > 0 && segment->start >= src->content_size) {
+    GST_WARNING_OBJECT (src,
+        "Potentially seeking beyond end of file, might EOS immediately");
+  }
+
+  src->request_position = segment->start;
+  src->stop_position = segment->stop;
+done:
+  g_mutex_unlock (&src->buffer_mutex);
+  return ret;
+}
+
 static void
 gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
 {
@@ -1893,6 +1984,17 @@ gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
       /* We have some special cases - deal with them here */
       if (g_strcmp0 (header_key, "content-type") == 0) {
         gst_curl_http_src_negotiate_caps (src);
+      } else if (g_strcmp0 (header_key, "accept-ranges") == 0 &&
+          g_ascii_strcasecmp (header_value, "none") == 0) {
+        s->seekable = GSTCURL_SEEKABLE_FALSE;
+      } else if (g_strcmp0 (header_key, "content-range") == 0) {
+        /* In the case of a Range GET, the Content-Length header will contain
+           the size of range requested, and the Content-Range header will
+           have the start, stop and total size of the resource */
+        gchar *size = strchr (header_value, '/');
+        if (size) {
+          s->content_size = atoi (size);
+        }
       }
 
       g_free (header_key);
index 36d1de4..efb8a25 100644 (file)
@@ -102,6 +102,13 @@ typedef enum
     GSTCURL_HTTP_VERSION_MAX
   } GstCurlHttpVersion;
 
+typedef enum _GstCGstCurlHttpSrcSeekable
+{
+    GSTCURL_SEEKABLE_UNKNOWN,
+    GSTCURL_SEEKABLE_TRUE,
+    GSTCURL_SEEKABLE_FALSE
+} GstCGstCurlHttpSrcSeekable;
+
 struct _GstCurlHttpSrcMultiTaskContext
 {
   GstTask     *task;
@@ -158,6 +165,8 @@ struct _GstCurlHttpSrc
   GstStructure *request_headers;  /* CURLOPT_HTTPHEADER */
   struct curl_slist *slist;
   gboolean accept_compressed_encodings; /* CURLOPT_ACCEPT_ENCODING */
+  gint64 request_position;     /* Seek to this position. */
+  gint64 stop_position;        /* Stop at this position. */
 
   /* Connection options */
   glong allow_3xx_redirect;     /* CURLOPT_FOLLOWLOCATION */
@@ -213,6 +222,8 @@ struct _GstCurlHttpSrc
   guint status_code;
   gchar *reason_phrase;
   gboolean hdrs_updated;
+  guint64 content_size;
+  GstCGstCurlHttpSrcSeekable seekable;
 
   CURLcode curl_result;
   char curl_errbuf[CURL_ERROR_SIZE];
index fa6288f..7398003 100644 (file)
@@ -40,12 +40,21 @@ typedef struct _GioHttpServer
   guint64 delay;
 } GioHttpServer;
 
+typedef struct _HttpHeader
+{
+  gchar *header;
+  gchar *value;
+} HttpHeader;
+
 typedef struct _HttpRequest
 {
   gchar *method;
   gchar *version;
   gchar *path;
   gchar *query;
+  gint64 range_start;
+  gint64 range_stop;
+  GSList *headers;
 } HttpRequest;
 
 static GioHttpServer *run_server (void);
@@ -53,21 +62,24 @@ static void stop_server (GioHttpServer * server);
 static guint16 get_port_from_server (GioHttpServer * server);
 
 static const gchar *STATUS_OK = "200 OK";
+static const gchar *STATUS_PARTIAL_CONTENT = "206 OK";
 static const gchar *STATUS_MOVED_PERMANENTLY = "301 Moved Permanently";
 static const gchar *STATUS_MOVED_TEMPORARILY = "302 Moved Temporarily";
 static const gchar *STATUS_TEMPORARY_REDIRECT = "307 Temporary Redirect";
 static const gchar *STATUS_FORBIDDEN = "403 Forbidden";
 static const gchar *STATUS_NOT_FOUND = "404 Not Found";
 
+static const guint64 http_content_length = G_GUINT64_CONSTANT (1024);
+
 static void
 do_get (GioHttpServer * server, const HttpRequest * req, GOutputStream * out)
 {
   gboolean send_error_doc = FALSE;
-  int buflen = 1024;
   const gchar *status = STATUS_OK;
   const gchar *content_type = "application/octet-stream";
+  guint64 buflen;
   GString *s;
-  char *buf = NULL;
+  gpointer *buf = NULL;
   gsize written = 0;
 
   GST_DEBUG ("%s request: \"%s\"", req->method, req->path);
@@ -86,6 +98,10 @@ do_get (GioHttpServer * server, const HttpRequest * req, GOutputStream * out)
     status = STATUS_NOT_FOUND;
     send_error_doc = TRUE;
   }
+  if (g_strcmp0 (req->method, "GET") == 0 &&
+      (req->range_start > 0 || req->range_stop >= 0)) {
+    status = STATUS_PARTIAL_CONTENT;
+  }
   s = g_string_new ("HTTP/");
   g_string_append_printf (s, "%s %s\r\n", req->version, status);
 
@@ -93,13 +109,33 @@ do_get (GioHttpServer * server, const HttpRequest * req, GOutputStream * out)
     g_string_append_printf (s, "Location: %s-redirected\r\n", req->path);
   }
 
-  if (status == STATUS_OK || send_error_doc) {
+  if (g_strcmp0 (req->method, "GET") == 0
+      || g_strcmp0 (req->method, "HEAD") == 0) {
+    g_string_append_printf (s, "Accept-Ranges: bytes\r\n");
+  }
+  if (status == STATUS_OK || status == STATUS_PARTIAL_CONTENT || send_error_doc) {
     g_string_append_printf (s, "Content-Type: %s\r\n", content_type);
-    g_string_append_printf (s, "Content-Length: %lu\r\n", (gulong) buflen);
-    if (!g_strcmp0 (req->method, "GET")) {
-      buf = g_malloc (buflen);
-      memset (buf, 0, buflen);
+    buflen = http_content_length;
+    if (req->range_start > 0 && req->range_stop >= 0) {
+      buflen = 1 + MIN (req->range_stop, buflen - 1) - req->range_start;
+    } else if (req->range_start > 0) {
+      buflen = buflen - req->range_start;
+    } else if (req->range_stop >= 0) {
+      buflen = 1 + MIN (req->range_stop, buflen - 1);
+    }
+    if (buflen != http_content_length) {
+      g_string_append_printf (s, "Content-Range: bytes %" G_GINT64_FORMAT "-%"
+          G_GINT64_FORMAT "/%" G_GUINT64_FORMAT "\r\n",
+          req->range_start,
+          req->range_stop >= 0 ? req->range_stop : (http_content_length - 1),
+          http_content_length);
     }
+    GST_TRACE ("buflen = %" G_GUINT64_FORMAT " range = %" G_GINT64_FORMAT
+        " -> %" G_GINT64_FORMAT, buflen, req->range_start, req->range_stop);
+    buf = g_malloc (buflen);
+    memset (buf, 0, buflen);
+    g_string_append_printf (s, "Content-Length: %" G_GUINT64_FORMAT "\r\n",
+        buflen);
   }
 
   g_string_append (s, "\r\n");
@@ -126,6 +162,59 @@ send_error (GOutputStream * out, int error_code, const gchar * reason)
   g_free (res);
 }
 
+static HttpHeader *
+http_header_new (const gchar * header, const gchar * value)
+{
+  HttpHeader *ret;
+
+  ret = g_slice_new (HttpHeader);
+  ret->header = g_strdup (header);
+  ret->value = g_strdup (value);
+  return ret;
+}
+
+static void
+http_header_free (HttpHeader * header)
+{
+  if (header) {
+    g_free (header->header);
+    g_free (header->value);
+    g_slice_free (HttpHeader, header);
+  }
+}
+
+static HttpRequest *
+http_request_new (const gchar * method, const gchar * version,
+    const gchar * path, const gchar * query)
+{
+  HttpRequest *req;
+
+  req = g_slice_new0 (HttpRequest);
+  req->method = g_strdup (method);
+  if (version)
+    req->version = g_strdup (version);
+  req->path = g_uri_unescape_string (path, NULL);
+  if (query)
+    req->query = g_strdup (query);
+  req->range_start = 0;
+  req->range_stop = -1;
+  return req;
+}
+
+static void
+http_request_free (HttpRequest * req)
+{
+  if (!req)
+    return;
+  g_free (req->method);
+  g_free (req->version);
+  g_free (req->path);
+  g_free (req->query);
+  if (req->headers)
+    g_slist_free_full (req->headers, (GDestroyNotify) http_header_free);
+  g_slice_free (HttpRequest, req);
+}
+
 static gboolean
 server_callback (GThreadedSocketService * service,
     GSocketConnection * connection,
@@ -135,8 +224,10 @@ server_callback (GThreadedSocketService * service,
   GOutputStream *out;
   GInputStream *in;
   GDataInputStream *data = NULL;
-  char *line = NULL, *escaped, *tmp;
-  HttpRequest req;
+  gchar *line = NULL, *escaped, *tmp;
+  HttpRequest *req = NULL;
+  gboolean done = FALSE;
+  gchar *version = NULL, *query;
 
   in = g_io_stream_get_input_stream (G_IO_STREAM (connection));
   out = g_io_stream_get_output_stream (G_IO_STREAM (connection));
@@ -157,35 +248,79 @@ server_callback (GThreadedSocketService * service,
     send_error (out, 400, "Invalid request");
     goto out;
   }
-  req.method = line;
   *tmp = '\0';
   escaped = tmp + 1;
 
-  req.version = NULL;
   tmp = strchr (escaped, ' ');
   if (tmp != NULL) {
     *tmp = 0;
-    req.version = tmp + 6;      /* skip "HTTP/" from version field */
+    version = tmp + 6;          /* skip "HTTP/" from version field */
   }
 
-  req.query = strchr (escaped, '?');
-  if (req.query != NULL) {
-    *req.query = '\0';
-    req.query++;
+  query = strchr (escaped, '?');
+  if (query != NULL) {
+    *query = '\0';
+    query++;
   }
 
-  req.path = g_uri_unescape_string (escaped, NULL);
+  req = http_request_new (line, version, escaped, query);
 
-  GST_DEBUG ("%s %s HTTP/%s", req.method, req.path, req.version);
+  GST_TRACE ("%s %s HTTP/%s", req->method, req->path, req->version);
 
+  while (!done) {
+    g_free (line);
+    line = g_data_input_stream_read_line (data, NULL, NULL, NULL);
+    if (!line) {
+      send_error (out, 400, "Invalid request");
+      goto out;
+    }
+    tmp = strchr (line, ':');
+    if (!tmp) {
+      /* reached end of HTTP request headers */
+      done = TRUE;
+      continue;
+    }
+    *tmp = '\0';
+    do {
+      ++tmp;
+    } while (*tmp == ' ');
+    GST_TRACE ("Request header: %s: %s", line, tmp);
+    req->headers = g_slist_append (req->headers, http_header_new (line, tmp));
+    if (g_ascii_strcasecmp (line, "range") == 0) {
+      gchar *start, *end;
+      start = strchr (tmp, '=');
+      if (!start) {
+        GST_ERROR ("Invalid range request: %s", tmp);
+        send_error (out, 400, "Invalid request");
+        goto out;
+      }
+      start++;
+      end = strchr (start, '-');
+      if (!end) {
+        GST_ERROR ("Invalid range request: %s", tmp);
+        send_error (out, 400, "Invalid request");
+        goto out;
+      }
+      *end = '\0';
+      end++;
+      if (*start != '\0') {
+        req->range_start = atoi (start);
+      }
+      if (*end != '\0') {
+        req->range_stop = atoi (end);
+      }
+      GST_DEBUG ("RANGE request %" G_GINT64_FORMAT " -> %" G_GINT64_FORMAT,
+          req->range_start, req->range_stop);
+    }
+  }
   if (server->delay) {
     g_usleep (server->delay);
   }
-  do_get (server, &req, out);
+  do_get (server, req, out);
 
-  g_free (req.path);
 out:
   g_free (line);
+  http_request_free (req);
   if (data)
     g_object_unref (data);
 
@@ -322,7 +457,6 @@ run_test (const gchar * path, gint expected_status_code,
         gint rc = -1;
         const GstStructure *details = NULL;
 
-        fail_unless (has_error);
         gst_message_parse_error (msg, &err, &debug);
         gst_message_parse_error_details (msg, &details);
         GST_DEBUG ("debug object: %s", debug);
@@ -346,6 +480,7 @@ run_test (const gchar * path, gint expected_status_code,
         }
         g_error_free (err);
         g_free (debug);
+        fail_unless (has_error);
         GST_DEBUG ("Got HTTP error %d, expected_status_code %d", rc,
             expected_status_code);
         res = (rc == expected_status_code);
@@ -452,6 +587,8 @@ typedef struct _HttpSrcTestDownloader
   GstElement *sink;
   GioHttpServer *server;
   guint count;
+  gint64 start_position;
+  gint64 stop_position;
 } HttpSrcTestDownloader;
 
 static gboolean
@@ -490,11 +627,22 @@ start_next_download (HttpSrcTestDownloader * tp)
   GST_DEBUG_OBJECT (tp->bin, "Start next request for: %s", url);
   g_object_set (tp->src, "location", url, NULL);
   g_free (url);
+  if (tp->start_position != 0 || tp->stop_position != -1) {
+    /* Send the seek event to the uri_handler, as the other pipeline elements
+     * can't handle it when READY. */
+    GST_DEBUG ("Range get %" G_GINT64_FORMAT " -> %" G_GINT64_FORMAT,
+        tp->start_position, tp->stop_position);
+    fail_if (!gst_element_send_event (tp->src, gst_event_new_seek (1.0,
+                GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
+                GST_SEEK_TYPE_SET, tp->start_position, GST_SEEK_TYPE_SET,
+                tp->stop_position + 1)),
+        "Source element can't handle range requests");
+  }
   fail_unless (gst_element_sync_state_with_parent (tp->bin));
 }
 
 static HttpSrcTestDownloader *
-test_curl_http_src_create_downloader (const gchar * name, guint64 delay)
+test_curl_http_src_downloader_new (const gchar * name, guint64 delay)
 {
   HttpSrcTestDownloader *tp;
   gchar *url;
@@ -504,6 +652,8 @@ test_curl_http_src_create_downloader (const gchar * name, guint64 delay)
   tp->server = run_server ();
   fail_if (tp->server == NULL, "Failed to start up HTTP server");
   tp->server->delay = delay;
+  tp->start_position = 0;
+  tp->stop_position = -1;
 
   tp->src = gst_element_factory_make ("curlhttpsrc", NULL);
   fail_unless (tp->src != NULL);
@@ -534,6 +684,14 @@ test_curl_http_src_create_downloader (const gchar * name, guint64 delay)
   return tp;
 }
 
+static void
+test_curl_http_src_downloader_free (HttpSrcTestDownloader * downloader)
+{
+  gst_element_set_state (downloader->bin, GST_STATE_NULL);
+  stop_server (downloader->server);
+  g_slice_free (HttpSrcTestDownloader, downloader);
+}
+
 typedef struct _MultipleHttpRequestsContext
 {
   GMainLoop *loop;
@@ -562,29 +720,25 @@ bus_message (GstBus * bus, GstMessage * msg, gpointer user_data)
           GST_MESSAGE_SRC (msg) == GST_OBJECT (context->pipe)) {
         GST_DEBUG ("Test ready to start");
         start_next_download (context->downloader1);
-        start_next_download (context->downloader2);
+        if (context->downloader2)
+          start_next_download (context->downloader2);
       } else if (newstate == GST_STATE_READY
           && pending == GST_STATE_VOID_PENDING) {
         if (GST_MESSAGE_SRC (msg) == GST_OBJECT (context->downloader1->bin)) {
           if (++context->downloader1->count < 20) {
             start_next_download (context->downloader1);
-          } else {
-            gst_element_set_state (context->downloader1->bin, GST_STATE_NULL);
-            if (context->downloader2->count == 20) {
-              g_main_loop_quit (context->loop);
-            }
           }
-        } else if (GST_MESSAGE_SRC (msg) ==
+        } else if (context->downloader2 && GST_MESSAGE_SRC (msg) ==
             GST_OBJECT (context->downloader2->bin)) {
           if (++context->downloader2->count < 20) {
             start_next_download (context->downloader2);
-          } else {
-            gst_element_set_state (context->downloader2->bin, GST_STATE_NULL);
-            if (context->downloader1->count == 20) {
-              g_main_loop_quit (context->loop);
-            }
           }
         }
+        if (context->downloader1->count == 20 &&
+            (context->downloader2 == NULL
+                || context->downloader2->count == 20)) {
+          g_main_loop_quit (context->loop);
+        }
       }
       break;
     case GST_MESSAGE_ERROR:
@@ -602,8 +756,8 @@ bus_message (GstBus * bus, GstMessage * msg, gpointer user_data)
       g_main_loop_quit (context->loop);
       break;
     case GST_MESSAGE_EOS:
-      if (context->downloader1->count == 20
-          && context->downloader2->count == 20) {
+      if (context->downloader1->count == 20 &&
+          (context->downloader2 == NULL || context->downloader2->count == 20)) {
         g_main_loop_quit (context->loop);
       }
       break;
@@ -629,10 +783,10 @@ GST_START_TEST (test_multiple_http_requests)
 
   context.loop = g_main_loop_new (NULL, FALSE);
   context.downloader1 =
-      test_curl_http_src_create_downloader ("bin1", 5 * G_USEC_PER_SEC / 1000);
+      test_curl_http_src_downloader_new ("bin1", 5 * G_USEC_PER_SEC / 1000);
   fail_unless (context.downloader1 != NULL);
   context.downloader2 =
-      test_curl_http_src_create_downloader ("bin2", 7 * G_USEC_PER_SEC / 1000);
+      test_curl_http_src_downloader_new ("bin2", 7 * G_USEC_PER_SEC / 1000);
   fail_unless (context.downloader2 != NULL);
 
   context.pipe = gst_pipeline_new (NULL);
@@ -652,12 +806,82 @@ GST_START_TEST (test_multiple_http_requests)
 
   g_main_loop_run (context.loop);
   g_source_remove (watch_id);
+  test_curl_http_src_downloader_free (context.downloader1);
+  test_curl_http_src_downloader_free (context.downloader2);
+  gst_element_set_state (context.pipe, GST_STATE_NULL);
+  gst_object_unref (context.pipe);
+  g_main_loop_unref (context.loop);
+}
+
+GST_END_TEST;
+
+typedef struct _DataProbeResult
+{
+  guint64 received;
+} DataProbeResult;
+
+static GstPadProbeReturn
+src_data_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  DataProbeResult *dpr = (DataProbeResult *) user_data;
+  GstBuffer *buf;
+
+  if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
+    buf = GST_PAD_PROBE_INFO_BUFFER (info);
+    dpr->received += gst_buffer_get_size (buf);
+  }
+
+  return GST_PAD_PROBE_OK;
+}
+
+GST_START_TEST (test_range_get)
+{
+  GstStateChangeReturn ret;
+  MultipleHttpRequestsContext context;
+  guint watch_id;
+  GstBus *bus;
+  GstPad *src_pad;
+  gulong probe_id;
+  DataProbeResult dpr;
+
+  context.loop = g_main_loop_new (NULL, FALSE);
+  context.downloader1 =
+      test_curl_http_src_downloader_new ("bin1", 5 * G_USEC_PER_SEC / 1000);
+  fail_unless (context.downloader1 != NULL);
+  context.downloader1->start_position = 128;
+  context.downloader1->stop_position = 255;
+  src_pad = gst_element_get_static_pad (context.downloader1->src, "src");
+  fail_unless (src_pad != NULL);
+  dpr.received = 0;
+  probe_id = gst_pad_add_probe (src_pad, GST_PAD_PROBE_TYPE_BUFFER,
+      src_data_probe, &dpr, NULL);
+  fail_unless (probe_id > 0);
+  context.downloader2 = NULL;
+
+  context.pipe = gst_pipeline_new (NULL);
+  fail_unless (context.pipe != NULL);
+
+  gst_bin_add (GST_BIN_CAST (context.pipe), context.downloader1->bin);
+
+  bus = gst_pipeline_get_bus (GST_PIPELINE (context.pipe));
+  watch_id = gst_bus_add_watch (bus, bus_message, &context);
+  gst_object_unref (bus);
+
+  GST_DEBUG ("Start pipeline playing");
+  ret = gst_element_set_state (context.pipe, GST_STATE_PLAYING);
+  fail_unless (ret == GST_STATE_CHANGE_ASYNC
+      || ret == GST_STATE_CHANGE_SUCCESS);
+
+  g_main_loop_run (context.loop);
+  fail_unless_equals_uint64 (dpr.received,
+      1 + context.downloader1->stop_position -
+      context.downloader1->start_position);
+  g_source_remove (watch_id);
+  gst_pad_remove_probe (src_pad, probe_id);
+  gst_object_unref (src_pad);
+  test_curl_http_src_downloader_free (context.downloader1);
   gst_element_set_state (context.pipe, GST_STATE_NULL);
   gst_object_unref (context.pipe);
-  stop_server (context.downloader1->server);
-  stop_server (context.downloader2->server);
-  g_slice_free (HttpSrcTestDownloader, context.downloader1);
-  g_slice_free (HttpSrcTestDownloader, context.downloader2);
   g_main_loop_unref (context.loop);
 }
 
@@ -686,6 +910,7 @@ curlhttpsrc_suite (void)
   tcase_add_test (tc_chain, test_forbidden);
   tcase_add_test (tc_chain, test_cookies);
   tcase_add_test (tc_chain, test_multiple_http_requests);
+  tcase_add_test (tc_chain, test_range_get);
 
   return s;
 }