2 * Copyright (C) 2021-2022 Jan Schmidt <jan@centricular.com>
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
16 * Youshould have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
22 #include "downloadhelper.h"
23 #include "../soup/gstsouploader.h"
25 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
26 #define GST_CAT_DEFAULT adaptivedemux2_debug
28 #define CHUNK_BUFFER_SIZE 32768
30 typedef struct DownloadHelperTransfer DownloadHelperTransfer;
34 GThread *transfer_thread;
38 GstAdaptiveDemuxClock *clock;
40 GMainContext *transfer_context;
45 GArray *active_transfers;
47 GAsyncQueue *transfer_requests;
48 GSource *transfer_requests_source;
55 struct DownloadHelperTransfer
61 gboolean progress_pending;
65 GCancellable *cancellable;
68 gboolean request_sent;
70 /* Current read buffer */
72 guint64 read_buffer_size;
73 guint64 read_position; /* Start in bytes of the read_buffer */
75 DownloadRequest *request;
79 free_transfer (DownloadHelperTransfer * transfer)
81 DownloadRequest *request = transfer->request;
84 download_request_unref (request);
86 if (transfer->blocking)
87 g_cond_clear (&transfer->cond);
89 g_object_unref (transfer->msg);
90 g_free (transfer->read_buffer);
95 transfer_completion_cb (gpointer src_object, GAsyncResult * res,
98 DownloadHelperTransfer *transfer = g_task_get_task_data ((GTask *) res);
99 DownloadRequest *request = transfer->request;
101 if (transfer->blocking)
102 return; /* Somehow a completion got signalled for a blocking request */
104 download_request_lock (request);
105 request->in_use = FALSE;
106 GST_LOG ("Despatching completion for transfer %p request %p", transfer,
108 download_request_despatch_completion (request);
109 download_request_unlock (request);
113 transfer_report_progress_cb (gpointer task)
115 DownloadHelperTransfer *transfer;
116 DownloadRequest *request;
118 /* Already completed - late callback */
119 if (g_task_get_completed (task))
122 transfer = g_task_get_task_data (task);
123 request = transfer->request;
125 download_request_lock (request);
126 if (request->send_progress) {
127 GST_LOG ("Despatching progress for transfer %p request %p", transfer,
129 download_request_despatch_progress (request);
131 transfer->progress_pending = FALSE;
132 download_request_unlock (request);
138 transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
139 SoupMessage * msg, gboolean blocking)
141 GTask *transfer_task = NULL;
142 DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1);
144 transfer->blocking = blocking;
145 if (transfer->blocking)
146 g_cond_init (&transfer->cond);
148 transfer->cancellable = g_cancellable_new ();
149 transfer->request = download_request_ref (request);
155 g_task_new (NULL, transfer->cancellable,
156 (GAsyncReadyCallback) transfer_completion_cb, NULL);
157 g_task_set_task_data (transfer_task, transfer,
158 (GDestroyNotify) free_transfer);
160 return transfer_task;
164 release_transfer_task_by_ref (GTask ** transfer_task)
166 g_object_unref (*transfer_task);
169 /* Called with download_request lock held */
171 transfer_task_report_progress (GTask * transfer_task)
173 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
174 DownloadRequest *request = transfer->request;
175 GSource *idle_source;
177 if (transfer->progress_pending == TRUE || !request->send_progress)
180 /* There's no progress cb pending and this download wants reports, so
181 * attach an idle source */
182 transfer->progress_pending = TRUE;
183 idle_source = g_idle_source_new ();
184 g_task_attach_source (transfer_task, idle_source,
185 transfer_report_progress_cb);
186 g_source_unref (idle_source);
190 finish_transfer_task (DownloadHelper * dh, GTask * transfer_task,
195 g_mutex_lock (&dh->transfer_lock);
196 for (i = dh->active_transfers->len - 1; i >= 0; i--) {
197 if (transfer_task == g_array_index (dh->active_transfers, GTask *, i)) {
198 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
200 transfer->complete = TRUE;
202 if (transfer->blocking)
203 g_cond_broadcast (&transfer->cond);
204 else if (error != NULL)
205 g_task_return_error (transfer_task, error);
207 g_task_return_boolean (transfer_task, TRUE);
209 /* This drops the task ref: */
210 g_array_remove_index_fast (dh->active_transfers, i);
211 g_mutex_unlock (&dh->transfer_lock);
215 g_mutex_unlock (&dh->transfer_lock);
217 GST_WARNING ("Did not find transfer %p in the active transfer list",
222 new_read_buffer (DownloadHelperTransfer * transfer)
224 gint buffer_size = CHUNK_BUFFER_SIZE;
226 DownloadRequest *request = transfer->request;
228 if (request->range_end != -1) {
229 if (request->range_end <= transfer->read_position) {
230 transfer->read_buffer = NULL;
231 transfer->read_buffer_size = 0;
234 if (request->range_end - transfer->read_position < buffer_size)
235 buffer_size = request->range_end - transfer->read_position + 1;
239 transfer->read_buffer = g_new (char, buffer_size);
240 transfer->read_buffer_size = buffer_size;
245 on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data)
247 GTask *transfer_task = user_data;
248 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
250 DownloadHelper *dh = transfer->dh;
251 DownloadRequest *request = transfer->request;
253 GInputStream *in = G_INPUT_STREAM (source);
254 GError *error = NULL;
255 gsize bytes_read = 0;
257 GstClockTime now = gst_adaptive_demux_clock_get_time (dh->clock);
259 gboolean read_failed =
260 g_input_stream_read_all_finish (in, result, &bytes_read, &error);
262 download_request_lock (request);
265 g_free (transfer->read_buffer);
266 transfer->read_buffer = NULL;
268 if (!g_cancellable_is_cancelled (transfer->cancellable)) {
269 GST_ERROR ("Failed to read stream: %s", error->message);
270 if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
271 request->state = DOWNLOAD_REQUEST_STATE_ERROR;
272 finish_transfer_task (dh, transfer_task, error);
274 /* Ignore error from cancelled operation */
275 g_error_free (error);
276 finish_transfer_task (dh, transfer_task, NULL);
278 download_request_unlock (request);
283 if (bytes_read > 0) {
284 GstBuffer *gst_buffer =
285 gst_buffer_new_wrapped (transfer->read_buffer, bytes_read);
287 GST_BUFFER_OFFSET (gst_buffer) = transfer->read_position;
288 transfer->read_position += bytes_read;
289 transfer->read_buffer = NULL;
291 /* Clip the buffer to within the range */
292 if (GST_BUFFER_OFFSET (gst_buffer) < request->range_start) {
293 if (transfer->read_position <= request->range_start) {
294 GST_DEBUG ("Discarding %" G_GSIZE_FORMAT
295 " bytes entirely before requested range",
296 gst_buffer_get_size (gst_buffer));
297 /* This buffer is completely before the range start, discard it */
298 gst_buffer_unref (gst_buffer);
301 GST_DEBUG ("Clipping first %" G_GINT64_FORMAT
302 " bytes before requested range",
303 request->range_start - GST_BUFFER_OFFSET (gst_buffer));
305 /* This buffer is partially within the requested range, clip the beginning */
306 gst_buffer_resize (gst_buffer,
307 request->range_start - GST_BUFFER_OFFSET (gst_buffer), -1);
308 GST_BUFFER_OFFSET (gst_buffer) = request->range_start;
312 if (request->download_start_time == GST_CLOCK_TIME_NONE) {
313 GST_LOG ("Got first data for URI %s", request->uri);
314 request->download_start_time = now;
317 if (gst_buffer != NULL) {
318 /* Unsent means cancellation is in progress, so don't override
319 * the state. Otherwise make sure it is LOADING */
320 if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
321 request->state = DOWNLOAD_REQUEST_STATE_LOADING;
323 GST_LOG ("Adding %u bytes to buffer",
324 (guint) (gst_buffer_get_size (gst_buffer)));
326 download_request_add_buffer (request, gst_buffer);
328 transfer_task_report_progress (transfer_task);
330 } else if (read_failed) {
331 /* The read failed and returned 0 bytes: We're done */
332 goto finish_transfer;
335 /* Resubmit the read request to get more */
336 if (!new_read_buffer (transfer))
337 goto finish_transfer;
339 g_main_context_push_thread_default (dh->transfer_context);
340 g_input_stream_read_all_async (in, transfer->read_buffer,
341 transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
342 on_read_ready, transfer_task);
343 g_main_context_pop_thread_default (dh->transfer_context);
345 download_request_unlock (request);
349 if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
350 SoupStatus status_code = _soup_message_get_status (transfer->msg);
352 GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
353 G_GINT64_FORMAT, status_code, request->uri,
354 request->range_start, request->range_end);
356 if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT) {
357 if (SOUP_STATUS_IS_SUCCESSFUL (status_code)
358 || SOUP_STATUS_IS_REDIRECTION (status_code))
359 request->state = DOWNLOAD_REQUEST_STATE_COMPLETE;
361 request->state = DOWNLOAD_REQUEST_STATE_ERROR;
364 request->download_end_time = now;
366 g_free (transfer->read_buffer);
367 transfer->read_buffer = NULL;
369 download_request_unlock (request);
371 finish_transfer_task (dh, transfer_task, NULL);
375 http_header_to_structure (const gchar * name, const gchar * value,
378 GstStructure *headers = user_data;
381 if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
384 gv = gst_structure_get_value (headers, name);
385 if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
386 GValue v = G_VALUE_INIT;
388 g_value_init (&v, G_TYPE_STRING);
389 g_value_set_string (&v, value);
390 gst_value_array_append_value ((GValue *) gv, &v);
392 } else if (gv && G_VALUE_HOLDS_STRING (gv)) {
393 GValue arr = G_VALUE_INIT;
394 GValue v = G_VALUE_INIT;
395 const gchar *old_value = g_value_get_string (gv);
397 g_value_init (&arr, GST_TYPE_ARRAY);
398 g_value_init (&v, G_TYPE_STRING);
399 g_value_set_string (&v, old_value);
400 gst_value_array_append_value (&arr, &v);
401 g_value_set_string (&v, value);
402 gst_value_array_append_value (&arr, &v);
404 gst_structure_set_value (headers, name, &arr);
406 g_value_unset (&arr);
408 gst_structure_set (headers, name, G_TYPE_STRING, value, NULL);
413 soup_msg_restarted_cb (SoupMessage * msg, gpointer user_data)
415 GTask *transfer_task = user_data;
416 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
417 DownloadRequest *request = transfer->request;
418 SoupStatus status = _soup_message_get_status (msg);
420 if (SOUP_STATUS_IS_REDIRECTION (status)) {
421 char *redirect_uri = gst_soup_message_uri_to_string (msg);
422 gboolean redirect_permanent = (status == SOUP_STATUS_MOVED_PERMANENTLY);
424 GST_DEBUG ("%u redirect to \"%s\" (permanent %d)",
425 status, redirect_uri, redirect_permanent);
427 download_request_lock (request);
428 g_free (request->redirect_uri);
429 request->redirect_uri = redirect_uri;
430 request->redirect_permanent = redirect_permanent;
431 download_request_unlock (request);
435 static GstStructure *
436 handle_response_headers (DownloadHelperTransfer * transfer)
438 DownloadRequest *request = transfer->request;
439 SoupMessage *msg = transfer->msg;
440 SoupMessageHeaders *response_headers;
441 GstStructure *http_headers, *headers;
443 http_headers = gst_structure_new_empty ("http-headers");
446 if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED &&
447 src->proxy_id && src->proxy_pw) {
448 /* wait for authenticate callback */
452 if (src->redirection_uri)
453 gst_structure_set (http_headers, "redirection-uri", G_TYPE_STRING,
454 src->redirection_uri, NULL);
457 headers = gst_structure_new_empty ("request-headers");
458 _soup_message_headers_foreach (_soup_message_get_request_headers (msg),
459 http_header_to_structure, headers);
460 gst_structure_set (http_headers, "request-headers", GST_TYPE_STRUCTURE,
462 gst_structure_free (headers);
463 headers = gst_structure_new_empty ("response-headers");
464 response_headers = _soup_message_get_response_headers (msg);
465 _soup_message_headers_foreach (response_headers, http_header_to_structure,
467 gst_structure_set (http_headers, "response-headers", GST_TYPE_STRUCTURE,
469 gst_structure_free (headers);
472 if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
474 gst_structure_free (http_headers);
475 return gst_soup_http_src_parse_status (msg, src);
479 /* Parse Content-Length. */
480 if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg)) &&
481 (_soup_message_headers_get_encoding (response_headers) ==
482 SOUP_ENCODING_CONTENT_LENGTH)) {
483 request->content_length =
484 _soup_message_headers_get_content_length (response_headers);
486 /* Parse Content-Range in a partial content response to set our initial read_position */
487 transfer->read_position = 0;
488 if (_soup_message_get_status (msg) == SOUP_STATUS_PARTIAL_CONTENT) {
490 if (_soup_message_headers_get_content_range (response_headers, &start,
492 GST_DEBUG ("Content-Range response %" G_GOFFSET_FORMAT "-%"
493 G_GOFFSET_FORMAT, start, end);
494 transfer->read_position = start;
497 if (transfer->read_position != request->range_start) {
498 GST_WARNING ("Server did not respect our range request for range %"
499 G_GINT64_FORMAT " to %" G_GINT64_FORMAT " - starting at offset %"
500 G_GUINT64_FORMAT, request->range_start, request->range_end,
501 transfer->read_position);
508 on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data)
510 GTask *transfer_task = user_data;
511 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
513 DownloadHelper *dh = transfer->dh;
514 DownloadRequest *request = transfer->request;
515 SoupMessage *msg = transfer->msg;
516 GError *error = NULL;
519 _soup_session_send_finish ((SoupSession *) source, result, &error);
521 download_request_lock (request);
524 request->status_code = _soup_message_get_status (msg);
526 if (!g_cancellable_is_cancelled (transfer->cancellable)) {
527 GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
528 G_GINT64_FORMAT, request->status_code, request->uri,
529 request->range_start, request->range_end);
531 if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
532 request->state = DOWNLOAD_REQUEST_STATE_ERROR;
533 finish_transfer_task (dh, transfer_task, error);
535 /* Ignore error from cancelled operation */
536 g_error_free (error);
537 finish_transfer_task (dh, transfer_task, NULL);
539 download_request_unlock (request);
541 /* No async callback queued - the transfer is done */
542 finish_transfer_task (dh, transfer_task, error);
546 /* If the state went back to UNSENT, we were cancelled so don't override it */
547 if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT &&
548 request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
550 request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
551 request->status_code = _soup_message_get_status (msg);
552 request->headers = handle_response_headers (transfer);
553 GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %"
554 G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT,
555 request->uri, request->range_start, request->range_end,
558 if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
559 || SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
560 request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
561 transfer_task_report_progress (transfer_task);
563 goto finish_transfer_error;
567 if (!new_read_buffer (transfer))
568 goto finish_transfer_error;
570 download_request_unlock (request);
572 g_main_context_push_thread_default (dh->transfer_context);
573 g_input_stream_read_all_async (in, transfer->read_buffer,
574 transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
575 on_read_ready, transfer_task);
576 g_main_context_pop_thread_default (dh->transfer_context);
581 finish_transfer_error:
582 request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
584 if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
585 GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
586 G_GINT64_FORMAT, _soup_message_get_status (msg), request->uri,
587 request->range_start, request->range_end);
588 if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
589 request->state = DOWNLOAD_REQUEST_STATE_ERROR;
592 g_free (transfer->read_buffer);
593 transfer->read_buffer = NULL;
595 download_request_unlock (request);
596 finish_transfer_task (dh, transfer_task, NULL);
601 downloadhelper_new (GstAdaptiveDemuxClock * clock)
603 DownloadHelper *dh = g_new0 (DownloadHelper, 1);
605 dh->transfer_context = g_main_context_new ();
606 dh->loop = g_main_loop_new (dh->transfer_context, FALSE);
608 dh->clock = gst_adaptive_demux_clock_ref (clock);
610 g_mutex_init (&dh->transfer_lock);
611 dh->active_transfers = g_array_new (FALSE, FALSE, sizeof (GTask *));
613 g_array_set_clear_func (dh->active_transfers,
614 (GDestroyNotify) (release_transfer_task_by_ref));
616 dh->transfer_requests =
617 g_async_queue_new_full ((GDestroyNotify) g_object_unref);
618 dh->transfer_requests_source = NULL;
620 /* libsoup 3.0 (not 2.74 or 3.1) dispatches using a single source attached
621 * when the session is created, so we need to ensure it matches here. */
622 g_main_context_push_thread_default (dh->transfer_context);
624 /* Set 10 second timeout. Any longer is likely
625 * an attempt to reuse an already closed connection */
626 dh->session = _soup_session_new_with_options ("timeout", 10, NULL);
628 g_main_context_pop_thread_default (dh->transfer_context);
634 downloadhelper_free (DownloadHelper * dh)
636 downloadhelper_stop (dh);
639 g_object_unref (dh->session);
640 g_main_loop_unref (dh->loop);
641 g_main_context_unref (dh->transfer_context);
644 gst_adaptive_demux_clock_unref (dh->clock);
646 g_array_free (dh->active_transfers, TRUE);
647 g_async_queue_unref (dh->transfer_requests);
649 g_free (dh->referer);
650 g_free (dh->user_agent);
651 g_strfreev (dh->cookies);
657 downloadhelper_set_referer (DownloadHelper * dh, const gchar * referer)
659 g_mutex_lock (&dh->transfer_lock);
660 g_free (dh->referer);
661 dh->referer = g_strdup (referer);
662 g_mutex_unlock (&dh->transfer_lock);
666 downloadhelper_set_user_agent (DownloadHelper * dh, const gchar * user_agent)
668 g_mutex_lock (&dh->transfer_lock);
669 g_free (dh->user_agent);
670 dh->user_agent = g_strdup (user_agent);
671 g_mutex_unlock (&dh->transfer_lock);
674 /* Takes ownership of the strv */
676 downloadhelper_set_cookies (DownloadHelper * dh, gchar ** cookies)
678 g_mutex_lock (&dh->transfer_lock);
679 g_strfreev (dh->cookies);
680 dh->cookies = cookies;
681 g_mutex_unlock (&dh->transfer_lock);
684 /* Called with the transfer lock held */
686 submit_transfer (DownloadHelper * dh, GTask * transfer_task)
688 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
689 DownloadRequest *request = transfer->request;
691 download_request_lock (request);
692 request->state = DOWNLOAD_REQUEST_STATE_OPEN;
693 request->download_request_time =
694 gst_adaptive_demux_clock_get_time (dh->clock);
696 GST_LOG ("Submitting request URI %s range %" G_GINT64_FORMAT " %"
697 G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
699 transfer_task_report_progress (transfer_task);
700 download_request_unlock (request);
702 _soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
703 on_request_sent, transfer_task);
704 g_array_append_val (dh->active_transfers, transfer_task);
707 /* Idle callback that submits all pending transfers */
709 submit_transfers_cb (DownloadHelper * dh)
713 g_mutex_lock (&dh->transfer_lock);
715 transfer = g_async_queue_try_pop (dh->transfer_requests);
717 submit_transfer (dh, transfer);
719 } while (transfer != NULL);
721 /* FIXME: Use a PollFD like GWakeup instead? */
722 g_source_destroy (dh->transfer_requests_source);
723 g_source_unref (dh->transfer_requests_source);
724 dh->transfer_requests_source = NULL;
726 g_mutex_unlock (&dh->transfer_lock);
728 return G_SOURCE_REMOVE;
732 dh_transfer_thread_func (gpointer data)
734 DownloadHelper *dh = data;
735 GST_DEBUG ("DownloadHelper thread starting");
737 g_main_context_push_thread_default (dh->transfer_context);
738 g_main_loop_run (dh->loop);
739 g_main_context_pop_thread_default (dh->transfer_context);
741 GST_DEBUG ("Exiting DownloadHelper thread");
746 downloadhelper_start (DownloadHelper * dh)
748 g_return_val_if_fail (dh->transfer_thread == NULL, FALSE);
750 g_mutex_lock (&dh->transfer_lock);
753 dh->transfer_thread =
754 g_thread_try_new ("adaptive-download-task", dh_transfer_thread_func, dh,
756 dh->running = (dh->transfer_thread != NULL);
758 g_mutex_unlock (&dh->transfer_lock);
764 downloadhelper_stop (DownloadHelper * dh)
767 GThread *transfer_thread = NULL;
769 GST_DEBUG ("Stopping DownloadHelper loop");
771 g_mutex_lock (&dh->transfer_lock);
775 for (i = 0; i < dh->active_transfers->len; i++) {
776 GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
777 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
778 g_cancellable_cancel (transfer->cancellable);
781 g_main_loop_quit (dh->loop);
783 transfer_thread = dh->transfer_thread;
784 dh->transfer_thread = NULL;
786 g_mutex_unlock (&dh->transfer_lock);
788 if (transfer_thread != NULL) {
789 g_thread_join (transfer_thread);
792 /* The transfer thread has exited at this point - any remaining transfers are unfinished
793 * and need cleaning up */
794 g_mutex_lock (&dh->transfer_lock);
796 for (i = 0; i < dh->active_transfers->len; i++) {
797 GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
798 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
799 DownloadRequest *request = transfer->request;
801 download_request_lock (request);
802 /* Reset the state to UNSENT, to indicate cancellation, like an XMLHttpRequest does */
803 request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
804 download_request_unlock (request);
806 transfer->complete = TRUE;
807 if (transfer->blocking)
808 g_cond_broadcast (&transfer->cond);
810 g_task_return_boolean (transfer_task, TRUE);
813 g_array_set_size (dh->active_transfers, 0);
814 g_mutex_unlock (&dh->transfer_lock);
818 downloadhelper_submit_request (DownloadHelper * dh,
819 const gchar * referer, DownloadFlags flags, DownloadRequest * request,
822 GTask *transfer_task = NULL;
825 SoupMessageHeaders *msg_headers;
826 gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0;
829 (flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD : SOUP_METHOD_GET;
831 download_request_lock (request);
832 if (request->in_use) {
833 GST_ERROR ("Request for URI %s reusing active request object",
835 download_request_unlock (request);
839 /* Clear the state back to unsent */
840 request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
842 msg = _soup_message_new (method, request->uri);
844 g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
845 "Could not parse download URI %s", request->uri);
847 request->state = DOWNLOAD_REQUEST_STATE_ERROR;
848 download_request_unlock (request);
853 /* NOTE: There was a bug where Akamai servers return the
854 * wrong result for a range request on small files. To avoid
855 * it if the range starts within the first KB of the file, just
856 * start at 0 instead */
857 if (request->range_start < 1024)
858 request->range_start = 0;
860 msg_headers = _soup_message_get_request_headers (msg);
862 if (request->range_start != 0 || request->range_end != -1) {
863 _soup_message_headers_set_range (msg_headers, request->range_start,
867 download_request_unlock (request);
869 /* If resubmitting a request, clear any stale / unused data */
870 download_request_begin_download (request);
872 if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
873 _soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
875 if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
876 _soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
879 /* Take the lock to protect header strings */
880 g_mutex_lock (&dh->transfer_lock);
882 if (referer != NULL) {
883 _soup_message_headers_append (msg_headers, "Referer", referer);
884 } else if (dh->referer != NULL) {
885 _soup_message_headers_append (msg_headers, "Referer", dh->referer);
888 if (dh->user_agent != NULL) {
889 _soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
892 if (dh->cookies != NULL) {
895 for (cookie = dh->cookies; *cookie != NULL; cookie++) {
896 _soup_message_headers_append (msg_headers, "Cookie", *cookie);
900 transfer_task = transfer_task_new (dh, request, msg, blocking);
903 /* The download helper was deactivated just as we went to dispatch this request.
904 * Abort and manually wake the request, as it never went in the active_transfer list */
905 g_mutex_unlock (&dh->transfer_lock);
907 download_request_lock (request);
908 request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
909 request->in_use = FALSE;
910 download_request_unlock (request);
912 g_cancellable_cancel (g_task_get_cancellable (transfer_task));
913 g_task_return_error_if_cancelled (transfer_task);
914 g_object_unref (transfer_task);
919 download_request_lock (request);
920 request->in_use = TRUE;
921 download_request_unlock (request);
923 g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
926 /* Now send the request over to the main loop for actual submission */
927 GST_LOG ("Submitting transfer task %p", transfer_task);
928 g_async_queue_push (dh->transfer_requests, transfer_task);
930 /* No pending idle source to wake the transfer loop - so create one */
931 if (dh->transfer_requests_source == NULL) {
932 dh->transfer_requests_source = g_idle_source_new ();
933 g_source_set_callback (dh->transfer_requests_source,
934 (GSourceFunc) submit_transfers_cb, dh, NULL);
935 g_source_attach (dh->transfer_requests_source, dh->transfer_context);
939 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
941 /* We need an extra ref on the task to make sure it stays alive.
942 * We pushed it in the async queue, but didn't unlock yet, so while
943 * we gave away our ref, the receiver can't have unreffed it */
944 g_object_ref (transfer_task);
945 while (!transfer->complete)
946 g_cond_wait (&transfer->cond, &dh->transfer_lock);
947 g_object_unref (transfer_task);
950 g_mutex_unlock (&dh->transfer_lock);
956 downloadhelper_cancel_request (DownloadHelper * dh, DownloadRequest * request)
960 g_mutex_lock (&dh->transfer_lock);
962 download_request_lock (request);
963 if (!request->in_use)
966 GST_DEBUG ("Cancelling request for URI %s range %" G_GINT64_FORMAT " %"
967 G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
969 request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
971 for (i = dh->active_transfers->len - 1; i >= 0; i--) {
972 GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
973 DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
975 if (transfer->request == request) {
976 GST_DEBUG ("Found transfer %p for request for URI %s range %"
977 G_GINT64_FORMAT " %" G_GINT64_FORMAT, transfer, request->uri,
978 request->range_start, request->range_end);
979 g_cancellable_cancel (transfer->cancellable);
985 download_request_unlock (request);
986 g_mutex_unlock (&dh->transfer_lock);
990 downloadhelper_fetch_uri_range (DownloadHelper * dh, const gchar * uri,
991 const gchar * referer, DownloadFlags flags, gint64 range_start,
992 gint64 range_end, GError ** err)
994 DownloadRequest *request;
996 g_return_val_if_fail (uri != NULL, NULL);
998 GST_DEBUG ("Fetching URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT,
999 uri, range_start, range_end);
1001 flags |= DOWNLOAD_FLAG_BLOCKING;
1003 request = download_request_new_uri_range (uri, range_start, range_end);
1005 if (!downloadhelper_submit_request (dh, referer, flags, request, err)) {
1006 download_request_unref (request);
1014 downloadhelper_fetch_uri (DownloadHelper * dh, const gchar * uri,
1015 const gchar * referer, DownloadFlags flags, GError ** err)
1017 return downloadhelper_fetch_uri_range (dh, uri, referer, flags, 0, -1, err);