static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
guint64 * size);
+static gboolean gst_curl_http_src_is_seekable (GstBaseSrc * bsrc);
+static gboolean gst_curl_http_src_do_seek (GstBaseSrc * bsrc,
+ GstSegment * segment);
static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
gstbasesrc_class->get_size =
GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
+ gstbasesrc_class->is_seekable =
+ GST_DEBUG_FUNCPTR (gst_curl_http_src_is_seekable);
+ gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_curl_http_src_do_seek);
gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
gstbasesrc_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
source->retries_remaining = source->total_retries;
source->slist = NULL;
source->accept_compressed_encodings = FALSE;
+ source->seekable = GSTCURL_SEEKABLE_UNKNOWN;
+ source->content_size = 0;
+ source->request_position = 0;
+ source->stop_position = -1;
gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
+ if (s->request_position || s->stop_position > 0) {
+ gchar *range;
+ if (s->stop_position < 1) {
+ /* start specified, no end specified */
+ range = g_strdup_printf ("%" G_GINT64_FORMAT "-", s->request_position);
+ } else {
+ /* in GStreamer the end position indicates the first byte that is not
+ in the range, whereas in HTTP the Content-Range header includes the
+ byte listed in the end value */
+ range = g_strdup_printf ("%" G_GINT64_FORMAT "-%" G_GINT64_FORMAT,
+ s->request_position, s->stop_position - 1);
+ }
+ GST_TRACE_OBJECT (s, "Requesting range: %s", range);
+ curl_easy_setopt (handle, CURLOPT_RANGE, range);
+ g_free (range);
+ }
+
switch (s->preferred_http_version) {
case GSTCURL_HTTP_VERSION_1_0:
GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
{
glong curl_info_long;
gdouble curl_info_dbl;
+ curl_off_t curl_info_offt;
gchar *redirect_url;
GstBaseSrc *basesrc;
const GValue *response_headers;
/*
* Push the content length
*/
- if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD,
- &curl_info_dbl) == CURLE_OK) {
- if (curl_info_dbl == -1) {
+ if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
+ &curl_info_offt) == CURLE_OK) {
+ if (curl_info_offt == -1) {
GST_WARNING_OBJECT (src,
"No Content-Length was specified in the response.");
+ src->seekable = GSTCURL_SEEKABLE_FALSE;
} else {
- GST_INFO_OBJECT (src, "Content-Length was given as %.0f", curl_info_dbl);
+ /* Note that in the case of a range get, Content-Length is the number
+ of bytes requested, not the total size of the resource */
+ GST_INFO_OBJECT (src, "Content-Length was given as %" G_GUINT64_FORMAT,
+ curl_info_offt);
+ if (src->content_size == 0) {
+ src->content_size = src->request_position + curl_info_offt;
+ }
basesrc = GST_BASE_SRC_CAST (src);
- basesrc->segment.duration = curl_info_dbl;
+ basesrc->segment.duration = src->request_position + curl_info_offt;
+ if (src->seekable == GSTCURL_SEEKABLE_UNKNOWN) {
+ src->seekable = GSTCURL_SEEKABLE_TRUE;
+ }
gst_element_post_message (GST_ELEMENT (src),
gst_message_new_duration_changed (GST_OBJECT (src)));
}
return ret;
}
+static gboolean
+gst_curl_http_src_is_seekable (GstBaseSrc * bsrc)
+{
+ GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
+
+ /* NOTE: if seekable is UNKNOWN, assume yes */
+ return src->seekable != GSTCURL_SEEKABLE_FALSE;
+}
+
+static gboolean
+gst_curl_http_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
+{
+ GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
+ gboolean ret = TRUE;
+
+ g_mutex_lock (&src->buffer_mutex);
+ GST_DEBUG_OBJECT (src, "do_seek(%" G_GINT64_FORMAT ", %" G_GINT64_FORMAT
+ ")", segment->start, segment->stop);
+ if (src->state == GSTCURL_UNLOCK) {
+ GST_WARNING_OBJECT (src, "Attempt to seek while unlocked");
+ ret = FALSE;
+ goto done;
+ }
+ if (src->request_position == segment->start &&
+ src->stop_position == segment->stop) {
+ GST_DEBUG_OBJECT (src, "Seek to current read/end position");
+ goto done;
+ }
+
+ if (src->seekable == GSTCURL_SEEKABLE_FALSE) {
+ GST_WARNING_OBJECT (src, "Not seekable");
+ ret = FALSE;
+ goto done;
+ }
+
+ if (segment->rate < 0.0 || segment->format != GST_FORMAT_BYTES) {
+ GST_WARNING_OBJECT (src, "Invalid seek segment");
+ ret = FALSE;
+ goto done;
+ }
+
+ if (src->content_size > 0 && segment->start >= src->content_size) {
+ GST_WARNING_OBJECT (src,
+ "Potentially seeking beyond end of file, might EOS immediately");
+ }
+
+ src->request_position = segment->start;
+ src->stop_position = segment->stop;
+done:
+ g_mutex_unlock (&src->buffer_mutex);
+ return ret;
+}
+
static void
gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
/* We have some special cases - deal with them here */
if (g_strcmp0 (header_key, "content-type") == 0) {
gst_curl_http_src_negotiate_caps (src);
+ } else if (g_strcmp0 (header_key, "accept-ranges") == 0 &&
+ g_ascii_strcasecmp (header_value, "none") == 0) {
+ s->seekable = GSTCURL_SEEKABLE_FALSE;
+ } else if (g_strcmp0 (header_key, "content-range") == 0) {
+ /* In the case of a Range GET, the Content-Length header will contain
+ the size of range requested, and the Content-Range header will
+ have the start, stop and total size of the resource */
+ gchar *size = strchr (header_value, '/');
+ if (size) {
+ s->content_size = atoi (size);
+ }
}
g_free (header_key);
guint64 delay;
} GioHttpServer;
+typedef struct _HttpHeader
+{
+ gchar *header;
+ gchar *value;
+} HttpHeader;
+
typedef struct _HttpRequest
{
gchar *method;
gchar *version;
gchar *path;
gchar *query;
+ gint64 range_start;
+ gint64 range_stop;
+ GSList *headers;
} HttpRequest;
static GioHttpServer *run_server (void);
static guint16 get_port_from_server (GioHttpServer * server);
static const gchar *STATUS_OK = "200 OK";
+static const gchar *STATUS_PARTIAL_CONTENT = "206 OK";
static const gchar *STATUS_MOVED_PERMANENTLY = "301 Moved Permanently";
static const gchar *STATUS_MOVED_TEMPORARILY = "302 Moved Temporarily";
static const gchar *STATUS_TEMPORARY_REDIRECT = "307 Temporary Redirect";
static const gchar *STATUS_FORBIDDEN = "403 Forbidden";
static const gchar *STATUS_NOT_FOUND = "404 Not Found";
+static const guint64 http_content_length = G_GUINT64_CONSTANT (1024);
+
static void
do_get (GioHttpServer * server, const HttpRequest * req, GOutputStream * out)
{
gboolean send_error_doc = FALSE;
- int buflen = 1024;
const gchar *status = STATUS_OK;
const gchar *content_type = "application/octet-stream";
+ guint64 buflen;
GString *s;
- char *buf = NULL;
+ gpointer *buf = NULL;
gsize written = 0;
GST_DEBUG ("%s request: \"%s\"", req->method, req->path);
status = STATUS_NOT_FOUND;
send_error_doc = TRUE;
}
+ if (g_strcmp0 (req->method, "GET") == 0 &&
+ (req->range_start > 0 || req->range_stop >= 0)) {
+ status = STATUS_PARTIAL_CONTENT;
+ }
s = g_string_new ("HTTP/");
g_string_append_printf (s, "%s %s\r\n", req->version, status);
g_string_append_printf (s, "Location: %s-redirected\r\n", req->path);
}
- if (status == STATUS_OK || send_error_doc) {
+ if (g_strcmp0 (req->method, "GET") == 0
+ || g_strcmp0 (req->method, "HEAD") == 0) {
+ g_string_append_printf (s, "Accept-Ranges: bytes\r\n");
+ }
+ if (status == STATUS_OK || status == STATUS_PARTIAL_CONTENT || send_error_doc) {
g_string_append_printf (s, "Content-Type: %s\r\n", content_type);
- g_string_append_printf (s, "Content-Length: %lu\r\n", (gulong) buflen);
- if (!g_strcmp0 (req->method, "GET")) {
- buf = g_malloc (buflen);
- memset (buf, 0, buflen);
+ buflen = http_content_length;
+ if (req->range_start > 0 && req->range_stop >= 0) {
+ buflen = 1 + MIN (req->range_stop, buflen - 1) - req->range_start;
+ } else if (req->range_start > 0) {
+ buflen = buflen - req->range_start;
+ } else if (req->range_stop >= 0) {
+ buflen = 1 + MIN (req->range_stop, buflen - 1);
+ }
+ if (buflen != http_content_length) {
+ g_string_append_printf (s, "Content-Range: bytes %" G_GINT64_FORMAT "-%"
+ G_GINT64_FORMAT "/%" G_GUINT64_FORMAT "\r\n",
+ req->range_start,
+ req->range_stop >= 0 ? req->range_stop : (http_content_length - 1),
+ http_content_length);
}
+ GST_TRACE ("buflen = %" G_GUINT64_FORMAT " range = %" G_GINT64_FORMAT
+ " -> %" G_GINT64_FORMAT, buflen, req->range_start, req->range_stop);
+ buf = g_malloc (buflen);
+ memset (buf, 0, buflen);
+ g_string_append_printf (s, "Content-Length: %" G_GUINT64_FORMAT "\r\n",
+ buflen);
}
g_string_append (s, "\r\n");
g_free (res);
}
+static HttpHeader *
+http_header_new (const gchar * header, const gchar * value)
+{
+ HttpHeader *ret;
+
+ ret = g_slice_new (HttpHeader);
+ ret->header = g_strdup (header);
+ ret->value = g_strdup (value);
+ return ret;
+}
+
+static void
+http_header_free (HttpHeader * header)
+{
+ if (header) {
+ g_free (header->header);
+ g_free (header->value);
+ g_slice_free (HttpHeader, header);
+ }
+}
+
+static HttpRequest *
+http_request_new (const gchar * method, const gchar * version,
+ const gchar * path, const gchar * query)
+{
+ HttpRequest *req;
+
+ req = g_slice_new0 (HttpRequest);
+ req->method = g_strdup (method);
+ if (version)
+ req->version = g_strdup (version);
+ req->path = g_uri_unescape_string (path, NULL);
+ if (query)
+ req->query = g_strdup (query);
+ req->range_start = 0;
+ req->range_stop = -1;
+ return req;
+}
+
+static void
+http_request_free (HttpRequest * req)
+{
+ if (!req)
+ return;
+ g_free (req->method);
+ g_free (req->version);
+ g_free (req->path);
+ g_free (req->query);
+ if (req->headers)
+ g_slist_free_full (req->headers, (GDestroyNotify) http_header_free);
+ g_slice_free (HttpRequest, req);
+}
+
static gboolean
server_callback (GThreadedSocketService * service,
GSocketConnection * connection,
GOutputStream *out;
GInputStream *in;
GDataInputStream *data = NULL;
- char *line = NULL, *escaped, *tmp;
- HttpRequest req;
+ gchar *line = NULL, *escaped, *tmp;
+ HttpRequest *req = NULL;
+ gboolean done = FALSE;
+ gchar *version = NULL, *query;
in = g_io_stream_get_input_stream (G_IO_STREAM (connection));
out = g_io_stream_get_output_stream (G_IO_STREAM (connection));
send_error (out, 400, "Invalid request");
goto out;
}
- req.method = line;
*tmp = '\0';
escaped = tmp + 1;
- req.version = NULL;
tmp = strchr (escaped, ' ');
if (tmp != NULL) {
*tmp = 0;
- req.version = tmp + 6; /* skip "HTTP/" from version field */
+ version = tmp + 6; /* skip "HTTP/" from version field */
}
- req.query = strchr (escaped, '?');
- if (req.query != NULL) {
- *req.query = '\0';
- req.query++;
+ query = strchr (escaped, '?');
+ if (query != NULL) {
+ *query = '\0';
+ query++;
}
- req.path = g_uri_unescape_string (escaped, NULL);
+ req = http_request_new (line, version, escaped, query);
- GST_DEBUG ("%s %s HTTP/%s", req.method, req.path, req.version);
+ GST_TRACE ("%s %s HTTP/%s", req->method, req->path, req->version);
+ while (!done) {
+ g_free (line);
+ line = g_data_input_stream_read_line (data, NULL, NULL, NULL);
+ if (!line) {
+ send_error (out, 400, "Invalid request");
+ goto out;
+ }
+ tmp = strchr (line, ':');
+ if (!tmp) {
+ /* reached end of HTTP request headers */
+ done = TRUE;
+ continue;
+ }
+ *tmp = '\0';
+ do {
+ ++tmp;
+ } while (*tmp == ' ');
+ GST_TRACE ("Request header: %s: %s", line, tmp);
+ req->headers = g_slist_append (req->headers, http_header_new (line, tmp));
+ if (g_ascii_strcasecmp (line, "range") == 0) {
+ gchar *start, *end;
+ start = strchr (tmp, '=');
+ if (!start) {
+ GST_ERROR ("Invalid range request: %s", tmp);
+ send_error (out, 400, "Invalid request");
+ goto out;
+ }
+ start++;
+ end = strchr (start, '-');
+ if (!end) {
+ GST_ERROR ("Invalid range request: %s", tmp);
+ send_error (out, 400, "Invalid request");
+ goto out;
+ }
+ *end = '\0';
+ end++;
+ if (*start != '\0') {
+ req->range_start = atoi (start);
+ }
+ if (*end != '\0') {
+ req->range_stop = atoi (end);
+ }
+ GST_DEBUG ("RANGE request %" G_GINT64_FORMAT " -> %" G_GINT64_FORMAT,
+ req->range_start, req->range_stop);
+ }
+ }
if (server->delay) {
g_usleep (server->delay);
}
- do_get (server, &req, out);
+ do_get (server, req, out);
- g_free (req.path);
out:
g_free (line);
+ http_request_free (req);
if (data)
g_object_unref (data);
gint rc = -1;
const GstStructure *details = NULL;
- fail_unless (has_error);
gst_message_parse_error (msg, &err, &debug);
gst_message_parse_error_details (msg, &details);
GST_DEBUG ("debug object: %s", debug);
}
g_error_free (err);
g_free (debug);
+ fail_unless (has_error);
GST_DEBUG ("Got HTTP error %d, expected_status_code %d", rc,
expected_status_code);
res = (rc == expected_status_code);
GstElement *sink;
GioHttpServer *server;
guint count;
+ gint64 start_position;
+ gint64 stop_position;
} HttpSrcTestDownloader;
static gboolean
GST_DEBUG_OBJECT (tp->bin, "Start next request for: %s", url);
g_object_set (tp->src, "location", url, NULL);
g_free (url);
+ if (tp->start_position != 0 || tp->stop_position != -1) {
+ /* Send the seek event to the uri_handler, as the other pipeline elements
+ * can't handle it when READY. */
+ GST_DEBUG ("Range get %" G_GINT64_FORMAT " -> %" G_GINT64_FORMAT,
+ tp->start_position, tp->stop_position);
+ fail_if (!gst_element_send_event (tp->src, gst_event_new_seek (1.0,
+ GST_FORMAT_BYTES, (GstSeekFlags) GST_SEEK_FLAG_FLUSH,
+ GST_SEEK_TYPE_SET, tp->start_position, GST_SEEK_TYPE_SET,
+ tp->stop_position + 1)),
+ "Source element can't handle range requests");
+ }
fail_unless (gst_element_sync_state_with_parent (tp->bin));
}
static HttpSrcTestDownloader *
-test_curl_http_src_create_downloader (const gchar * name, guint64 delay)
+test_curl_http_src_downloader_new (const gchar * name, guint64 delay)
{
HttpSrcTestDownloader *tp;
gchar *url;
tp->server = run_server ();
fail_if (tp->server == NULL, "Failed to start up HTTP server");
tp->server->delay = delay;
+ tp->start_position = 0;
+ tp->stop_position = -1;
tp->src = gst_element_factory_make ("curlhttpsrc", NULL);
fail_unless (tp->src != NULL);
return tp;
}
+static void
+test_curl_http_src_downloader_free (HttpSrcTestDownloader * downloader)
+{
+ gst_element_set_state (downloader->bin, GST_STATE_NULL);
+ stop_server (downloader->server);
+ g_slice_free (HttpSrcTestDownloader, downloader);
+}
+
typedef struct _MultipleHttpRequestsContext
{
GMainLoop *loop;
GST_MESSAGE_SRC (msg) == GST_OBJECT (context->pipe)) {
GST_DEBUG ("Test ready to start");
start_next_download (context->downloader1);
- start_next_download (context->downloader2);
+ if (context->downloader2)
+ start_next_download (context->downloader2);
} else if (newstate == GST_STATE_READY
&& pending == GST_STATE_VOID_PENDING) {
if (GST_MESSAGE_SRC (msg) == GST_OBJECT (context->downloader1->bin)) {
if (++context->downloader1->count < 20) {
start_next_download (context->downloader1);
- } else {
- gst_element_set_state (context->downloader1->bin, GST_STATE_NULL);
- if (context->downloader2->count == 20) {
- g_main_loop_quit (context->loop);
- }
}
- } else if (GST_MESSAGE_SRC (msg) ==
+ } else if (context->downloader2 && GST_MESSAGE_SRC (msg) ==
GST_OBJECT (context->downloader2->bin)) {
if (++context->downloader2->count < 20) {
start_next_download (context->downloader2);
- } else {
- gst_element_set_state (context->downloader2->bin, GST_STATE_NULL);
- if (context->downloader1->count == 20) {
- g_main_loop_quit (context->loop);
- }
}
}
+ if (context->downloader1->count == 20 &&
+ (context->downloader2 == NULL
+ || context->downloader2->count == 20)) {
+ g_main_loop_quit (context->loop);
+ }
}
break;
case GST_MESSAGE_ERROR:
g_main_loop_quit (context->loop);
break;
case GST_MESSAGE_EOS:
- if (context->downloader1->count == 20
- && context->downloader2->count == 20) {
+ if (context->downloader1->count == 20 &&
+ (context->downloader2 == NULL || context->downloader2->count == 20)) {
g_main_loop_quit (context->loop);
}
break;
context.loop = g_main_loop_new (NULL, FALSE);
context.downloader1 =
- test_curl_http_src_create_downloader ("bin1", 5 * G_USEC_PER_SEC / 1000);
+ test_curl_http_src_downloader_new ("bin1", 5 * G_USEC_PER_SEC / 1000);
fail_unless (context.downloader1 != NULL);
context.downloader2 =
- test_curl_http_src_create_downloader ("bin2", 7 * G_USEC_PER_SEC / 1000);
+ test_curl_http_src_downloader_new ("bin2", 7 * G_USEC_PER_SEC / 1000);
fail_unless (context.downloader2 != NULL);
context.pipe = gst_pipeline_new (NULL);
g_main_loop_run (context.loop);
g_source_remove (watch_id);
+ test_curl_http_src_downloader_free (context.downloader1);
+ test_curl_http_src_downloader_free (context.downloader2);
+ gst_element_set_state (context.pipe, GST_STATE_NULL);
+ gst_object_unref (context.pipe);
+ g_main_loop_unref (context.loop);
+}
+
+GST_END_TEST;
+
+typedef struct _DataProbeResult
+{
+ guint64 received;
+} DataProbeResult;
+
+static GstPadProbeReturn
+src_data_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ DataProbeResult *dpr = (DataProbeResult *) user_data;
+ GstBuffer *buf;
+
+ if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {
+ buf = GST_PAD_PROBE_INFO_BUFFER (info);
+ dpr->received += gst_buffer_get_size (buf);
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
+GST_START_TEST (test_range_get)
+{
+ GstStateChangeReturn ret;
+ MultipleHttpRequestsContext context;
+ guint watch_id;
+ GstBus *bus;
+ GstPad *src_pad;
+ gulong probe_id;
+ DataProbeResult dpr;
+
+ context.loop = g_main_loop_new (NULL, FALSE);
+ context.downloader1 =
+ test_curl_http_src_downloader_new ("bin1", 5 * G_USEC_PER_SEC / 1000);
+ fail_unless (context.downloader1 != NULL);
+ context.downloader1->start_position = 128;
+ context.downloader1->stop_position = 255;
+ src_pad = gst_element_get_static_pad (context.downloader1->src, "src");
+ fail_unless (src_pad != NULL);
+ dpr.received = 0;
+ probe_id = gst_pad_add_probe (src_pad, GST_PAD_PROBE_TYPE_BUFFER,
+ src_data_probe, &dpr, NULL);
+ fail_unless (probe_id > 0);
+ context.downloader2 = NULL;
+
+ context.pipe = gst_pipeline_new (NULL);
+ fail_unless (context.pipe != NULL);
+
+ gst_bin_add (GST_BIN_CAST (context.pipe), context.downloader1->bin);
+
+ bus = gst_pipeline_get_bus (GST_PIPELINE (context.pipe));
+ watch_id = gst_bus_add_watch (bus, bus_message, &context);
+ gst_object_unref (bus);
+
+ GST_DEBUG ("Start pipeline playing");
+ ret = gst_element_set_state (context.pipe, GST_STATE_PLAYING);
+ fail_unless (ret == GST_STATE_CHANGE_ASYNC
+ || ret == GST_STATE_CHANGE_SUCCESS);
+
+ g_main_loop_run (context.loop);
+ fail_unless_equals_uint64 (dpr.received,
+ 1 + context.downloader1->stop_position -
+ context.downloader1->start_position);
+ g_source_remove (watch_id);
+ gst_pad_remove_probe (src_pad, probe_id);
+ gst_object_unref (src_pad);
+ test_curl_http_src_downloader_free (context.downloader1);
gst_element_set_state (context.pipe, GST_STATE_NULL);
gst_object_unref (context.pipe);
- stop_server (context.downloader1->server);
- stop_server (context.downloader2->server);
- g_slice_free (HttpSrcTestDownloader, context.downloader1);
- g_slice_free (HttpSrcTestDownloader, context.downloader2);
g_main_loop_unref (context.loop);
}
tcase_add_test (tc_chain, test_forbidden);
tcase_add_test (tc_chain, test_cookies);
tcase_add_test (tc_chain, test_multiple_http_requests);
+ tcase_add_test (tc_chain, test_range_get);
return s;
}