adaptivedemux2/downloadhelper: Add debug output of response headers
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / ext / adaptivedemux2 / downloadhelper.c
1 /* GStreamer
2  * Copyright (C) 2021-2022 Jan Schmidt <jan@centricular.com>
3  *
4  * downloadhelper.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * Youshould have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21
22 #include "downloadhelper.h"
23 #include "../soup/gstsouploader.h"
24
25 GST_DEBUG_CATEGORY_EXTERN (adaptivedemux2_debug);
26 #define GST_CAT_DEFAULT adaptivedemux2_debug
27
28 #define CHUNK_BUFFER_SIZE 32768
29
30 typedef struct DownloadHelperTransfer DownloadHelperTransfer;
31
32 struct DownloadHelper
33 {
34   GThread *transfer_thread;
35
36   gboolean running;
37
38   GstAdaptiveDemuxClock *clock;
39
40   GMainContext *transfer_context;
41   GMainLoop *loop;
42   SoupSession *session;
43
44   GMutex transfer_lock;
45   GArray *active_transfers;
46
47   GAsyncQueue *transfer_requests;
48   GSource *transfer_requests_source;
49
50   gchar *referer;
51   gchar *user_agent;
52   gchar **cookies;
53 };
54
55 struct DownloadHelperTransfer
56 {
57   DownloadHelper *dh;
58
59   gboolean blocking;
60   gboolean complete;
61   gboolean progress_pending;
62
63   GCond cond;
64
65   GCancellable *cancellable;
66
67   SoupMessage *msg;
68   gboolean request_sent;
69
70   /* Current read buffer */
71   char *read_buffer;
72   guint64 read_buffer_size;
73   guint64 read_position;        /* Start in bytes of the read_buffer */
74
75   DownloadRequest *request;
76 };
77
78 static void
79 free_transfer (DownloadHelperTransfer * transfer)
80 {
81   DownloadRequest *request = transfer->request;
82
83   if (request)
84     download_request_unref (request);
85
86   if (transfer->blocking)
87     g_cond_clear (&transfer->cond);
88
89   g_object_unref (transfer->msg);
90   g_free (transfer->read_buffer);
91   g_free (transfer);
92 }
93
94 static void
95 transfer_completion_cb (gpointer src_object, GAsyncResult * res,
96     gpointer user_data)
97 {
98   DownloadHelperTransfer *transfer = g_task_get_task_data ((GTask *) res);
99   DownloadRequest *request = transfer->request;
100
101   if (transfer->blocking)
102     return;                     /* Somehow a completion got signalled for a blocking request */
103
104   download_request_lock (request);
105   request->in_use = FALSE;
106   GST_LOG ("Despatching completion for transfer %p request %p", transfer,
107       request);
108   download_request_despatch_completion (request);
109   download_request_unlock (request);
110 }
111
112 static gboolean
113 transfer_report_progress_cb (gpointer task)
114 {
115   DownloadHelperTransfer *transfer;
116   DownloadRequest *request;
117
118   /* Already completed - late callback */
119   if (g_task_get_completed (task))
120     return FALSE;
121
122   transfer = g_task_get_task_data (task);
123   request = transfer->request;
124
125   download_request_lock (request);
126   if (request->send_progress) {
127     GST_LOG ("Despatching progress for transfer %p request %p", transfer,
128         request);
129     download_request_despatch_progress (request);
130   }
131   transfer->progress_pending = FALSE;
132   download_request_unlock (request);
133
134   return FALSE;
135 }
136
137 static GTask *
138 transfer_task_new (DownloadHelper * dh, DownloadRequest * request,
139     SoupMessage * msg, gboolean blocking)
140 {
141   GTask *transfer_task = NULL;
142   DownloadHelperTransfer *transfer = g_new0 (DownloadHelperTransfer, 1);
143
144   transfer->blocking = blocking;
145   if (transfer->blocking)
146     g_cond_init (&transfer->cond);
147
148   transfer->cancellable = g_cancellable_new ();
149   transfer->request = download_request_ref (request);
150
151   transfer->dh = dh;
152   transfer->msg = msg;
153
154   transfer_task =
155       g_task_new (NULL, transfer->cancellable,
156       (GAsyncReadyCallback) transfer_completion_cb, NULL);
157   g_task_set_task_data (transfer_task, transfer,
158       (GDestroyNotify) free_transfer);
159
160   return transfer_task;
161 }
162
163 static void
164 release_transfer_task_by_ref (GTask ** transfer_task)
165 {
166   g_object_unref (*transfer_task);
167 }
168
169 /* Called with download_request lock held */
170 static void
171 transfer_task_report_progress (GTask * transfer_task)
172 {
173   DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
174   DownloadRequest *request = transfer->request;
175   GSource *idle_source;
176
177   if (transfer->progress_pending == TRUE || !request->send_progress)
178     return;
179
180   /* There's no progress cb pending and this download wants reports, so
181    * attach an idle source */
182   transfer->progress_pending = TRUE;
183   idle_source = g_idle_source_new ();
184   g_task_attach_source (transfer_task, idle_source,
185       transfer_report_progress_cb);
186   g_source_unref (idle_source);
187 }
188
189 static void
190 finish_transfer_task (DownloadHelper * dh, GTask * transfer_task,
191     GError * error)
192 {
193   int i;
194
195   g_mutex_lock (&dh->transfer_lock);
196   for (i = dh->active_transfers->len - 1; i >= 0; i--) {
197     if (transfer_task == g_array_index (dh->active_transfers, GTask *, i)) {
198       DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
199
200       transfer->complete = TRUE;
201
202       if (transfer->blocking)
203         g_cond_broadcast (&transfer->cond);
204       else if (error != NULL)
205         g_task_return_error (transfer_task, error);
206       else
207         g_task_return_boolean (transfer_task, TRUE);
208
209       /* This drops the task ref: */
210       g_array_remove_index_fast (dh->active_transfers, i);
211       g_mutex_unlock (&dh->transfer_lock);
212       return;
213     }
214   }
215   g_mutex_unlock (&dh->transfer_lock);
216
217   GST_WARNING ("Did not find transfer %p in the active transfer list",
218       transfer_task);
219 }
220
221 static gboolean
222 new_read_buffer (DownloadHelperTransfer * transfer)
223 {
224   gint buffer_size = CHUNK_BUFFER_SIZE;
225 #if 0
226   DownloadRequest *request = transfer->request;
227
228   if (request->range_end != -1) {
229     if (request->range_end <= transfer->read_position) {
230       transfer->read_buffer = NULL;
231       transfer->read_buffer_size = 0;
232       return FALSE;
233     }
234     if (request->range_end - transfer->read_position < buffer_size)
235       buffer_size = request->range_end - transfer->read_position + 1;
236   }
237 #endif
238
239   transfer->read_buffer = g_new (char, buffer_size);
240   transfer->read_buffer_size = buffer_size;
241   return TRUE;
242 }
243
244 static void
245 on_read_ready (GObject * source, GAsyncResult * result, gpointer user_data)
246 {
247   GTask *transfer_task = user_data;
248   DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
249
250   DownloadHelper *dh = transfer->dh;
251   DownloadRequest *request = transfer->request;
252
253   GInputStream *in = G_INPUT_STREAM (source);
254   GError *error = NULL;
255   gsize bytes_read = 0;
256
257   GstClockTime now = gst_adaptive_demux_clock_get_time (dh->clock);
258
259   gboolean read_failed =
260       g_input_stream_read_all_finish (in, result, &bytes_read, &error);
261
262   download_request_lock (request);
263
264   if (error) {
265     g_free (transfer->read_buffer);
266     transfer->read_buffer = NULL;
267
268     if (!g_cancellable_is_cancelled (transfer->cancellable)) {
269       GST_ERROR ("Failed to read stream: %s", error->message);
270       if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
271         request->state = DOWNLOAD_REQUEST_STATE_ERROR;
272       finish_transfer_task (dh, transfer_task, error);
273     } else {
274       /* Ignore error from cancelled operation */
275       g_error_free (error);
276       finish_transfer_task (dh, transfer_task, NULL);
277     }
278     download_request_unlock (request);
279
280     return;
281   }
282
283   if (bytes_read > 0) {
284     GstBuffer *gst_buffer =
285         gst_buffer_new_wrapped (transfer->read_buffer, bytes_read);
286
287     GST_BUFFER_OFFSET (gst_buffer) = transfer->read_position;
288     transfer->read_position += bytes_read;
289     transfer->read_buffer = NULL;
290
291     /* Clip the buffer to within the range */
292     if (GST_BUFFER_OFFSET (gst_buffer) < request->range_start) {
293       if (transfer->read_position <= request->range_start) {
294         GST_DEBUG ("Discarding %" G_GSIZE_FORMAT
295             " bytes entirely before requested range",
296             gst_buffer_get_size (gst_buffer));
297         /* This buffer is completely before the range start, discard it */
298         gst_buffer_unref (gst_buffer);
299         gst_buffer = NULL;
300       } else {
301         GST_DEBUG ("Clipping first %" G_GINT64_FORMAT
302             " bytes before requested range",
303             request->range_start - GST_BUFFER_OFFSET (gst_buffer));
304
305         /* This buffer is partially within the requested range, clip the beginning */
306         gst_buffer_resize (gst_buffer,
307             request->range_start - GST_BUFFER_OFFSET (gst_buffer), -1);
308         GST_BUFFER_OFFSET (gst_buffer) = request->range_start;
309       }
310     }
311
312     if (request->download_start_time == GST_CLOCK_TIME_NONE) {
313       GST_LOG ("Got first data for URI %s", request->uri);
314       request->download_start_time = now;
315     }
316
317     if (gst_buffer != NULL) {
318       /* Unsent means cancellation is in progress, so don't override
319        * the state. Otherwise make sure it is LOADING */
320       if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
321         request->state = DOWNLOAD_REQUEST_STATE_LOADING;
322
323       GST_LOG ("Adding %u bytes to buffer",
324           (guint) (gst_buffer_get_size (gst_buffer)));
325
326       download_request_add_buffer (request, gst_buffer);
327
328       transfer_task_report_progress (transfer_task);
329     }
330   } else if (read_failed) {
331     /* The read failed and returned 0 bytes: We're done */
332     goto finish_transfer;
333   }
334
335   /* Resubmit the read request to get more */
336   if (!new_read_buffer (transfer))
337     goto finish_transfer;
338
339   g_main_context_push_thread_default (dh->transfer_context);
340   g_input_stream_read_all_async (in, transfer->read_buffer,
341       transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
342       on_read_ready, transfer_task);
343   g_main_context_pop_thread_default (dh->transfer_context);
344
345   download_request_unlock (request);
346   return;
347
348 finish_transfer:
349   if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
350     SoupStatus status_code = _soup_message_get_status (transfer->msg);
351
352     GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
353         G_GINT64_FORMAT, status_code, request->uri,
354         request->range_start, request->range_end);
355
356     if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT) {
357       if (SOUP_STATUS_IS_SUCCESSFUL (status_code)
358           || SOUP_STATUS_IS_REDIRECTION (status_code))
359         request->state = DOWNLOAD_REQUEST_STATE_COMPLETE;
360       else
361         request->state = DOWNLOAD_REQUEST_STATE_ERROR;
362     }
363   }
364   request->download_end_time = now;
365
366   g_free (transfer->read_buffer);
367   transfer->read_buffer = NULL;
368
369   download_request_unlock (request);
370
371   finish_transfer_task (dh, transfer_task, NULL);
372 }
373
374 static void
375 http_header_to_structure (const gchar * name, const gchar * value,
376     gpointer user_data)
377 {
378   GstStructure *headers = user_data;
379   const GValue *gv;
380
381   if (!g_utf8_validate (name, -1, NULL) || !g_utf8_validate (value, -1, NULL))
382     return;
383
384   gv = gst_structure_get_value (headers, name);
385   if (gv && GST_VALUE_HOLDS_ARRAY (gv)) {
386     GValue v = G_VALUE_INIT;
387
388     g_value_init (&v, G_TYPE_STRING);
389     g_value_set_string (&v, value);
390     gst_value_array_append_value ((GValue *) gv, &v);
391     g_value_unset (&v);
392   } else if (gv && G_VALUE_HOLDS_STRING (gv)) {
393     GValue arr = G_VALUE_INIT;
394     GValue v = G_VALUE_INIT;
395     const gchar *old_value = g_value_get_string (gv);
396
397     g_value_init (&arr, GST_TYPE_ARRAY);
398     g_value_init (&v, G_TYPE_STRING);
399     g_value_set_string (&v, old_value);
400     gst_value_array_append_value (&arr, &v);
401     g_value_set_string (&v, value);
402     gst_value_array_append_value (&arr, &v);
403
404     gst_structure_set_value (headers, name, &arr);
405     g_value_unset (&v);
406     g_value_unset (&arr);
407   } else {
408     gst_structure_set (headers, name, G_TYPE_STRING, value, NULL);
409   }
410 }
411
412 static void
413 soup_msg_restarted_cb (SoupMessage * msg, gpointer user_data)
414 {
415   GTask *transfer_task = user_data;
416   DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
417   DownloadRequest *request = transfer->request;
418   SoupStatus status = _soup_message_get_status (msg);
419
420   if (SOUP_STATUS_IS_REDIRECTION (status)) {
421     char *redirect_uri = gst_soup_message_uri_to_string (msg);
422     gboolean redirect_permanent = (status == SOUP_STATUS_MOVED_PERMANENTLY);
423
424     GST_DEBUG ("%u redirect to \"%s\" (permanent %d)",
425         status, redirect_uri, redirect_permanent);
426
427     download_request_lock (request);
428     g_free (request->redirect_uri);
429     request->redirect_uri = redirect_uri;
430     request->redirect_permanent = redirect_permanent;
431     download_request_unlock (request);
432   }
433 }
434
435 static GstStructure *
436 handle_response_headers (DownloadHelperTransfer * transfer)
437 {
438   DownloadRequest *request = transfer->request;
439   SoupMessage *msg = transfer->msg;
440   SoupMessageHeaders *response_headers;
441   GstStructure *http_headers, *headers;
442
443   http_headers = gst_structure_new_empty ("http-headers");
444
445 #if 0
446   if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED &&
447       src->proxy_id && src->proxy_pw) {
448     /* wait for authenticate callback */
449     return GST_FLOW_OK;
450   }
451
452   if (src->redirection_uri)
453     gst_structure_set (http_headers, "redirection-uri", G_TYPE_STRING,
454         src->redirection_uri, NULL);
455 #endif
456
457   headers = gst_structure_new_empty ("request-headers");
458   _soup_message_headers_foreach (_soup_message_get_request_headers (msg),
459       http_header_to_structure, headers);
460   gst_structure_set (http_headers, "request-headers", GST_TYPE_STRUCTURE,
461       headers, NULL);
462   gst_structure_free (headers);
463   headers = gst_structure_new_empty ("response-headers");
464   response_headers = _soup_message_get_response_headers (msg);
465   _soup_message_headers_foreach (response_headers, http_header_to_structure,
466       headers);
467   gst_structure_set (http_headers, "response-headers", GST_TYPE_STRUCTURE,
468       headers, NULL);
469   gst_structure_free (headers);
470
471 #if 0
472   if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
473     /* force an error */
474     gst_structure_free (http_headers);
475     return gst_soup_http_src_parse_status (msg, src);
476   }
477 #endif
478
479   /* Parse Content-Length. */
480   if (SOUP_STATUS_IS_SUCCESSFUL (_soup_message_get_status (msg)) &&
481       (_soup_message_headers_get_encoding (response_headers) ==
482           SOUP_ENCODING_CONTENT_LENGTH)) {
483     request->content_length =
484         _soup_message_headers_get_content_length (response_headers);
485   }
486   /* Parse Content-Range in a partial content response to set our initial read_position */
487   transfer->read_position = 0;
488   if (_soup_message_get_status (msg) == SOUP_STATUS_PARTIAL_CONTENT) {
489     goffset start, end;
490     if (_soup_message_headers_get_content_range (response_headers, &start,
491             &end, NULL)) {
492       GST_DEBUG ("Content-Range response %" G_GOFFSET_FORMAT "-%"
493           G_GOFFSET_FORMAT, start, end);
494       transfer->read_position = start;
495     }
496   }
497   if (transfer->read_position != request->range_start) {
498     GST_WARNING ("Server did not respect our range request for range %"
499         G_GINT64_FORMAT " to %" G_GINT64_FORMAT " - starting at offset %"
500         G_GUINT64_FORMAT, request->range_start, request->range_end,
501         transfer->read_position);
502   }
503
504   return http_headers;
505 }
506
507 static void
508 on_request_sent (GObject * source, GAsyncResult * result, gpointer user_data)
509 {
510   GTask *transfer_task = user_data;
511   DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
512
513   DownloadHelper *dh = transfer->dh;
514   DownloadRequest *request = transfer->request;
515   SoupMessage *msg = transfer->msg;
516   GError *error = NULL;
517
518   GInputStream *in =
519       _soup_session_send_finish ((SoupSession *) source, result, &error);
520
521   download_request_lock (request);
522
523   if (in == NULL) {
524     request->status_code = _soup_message_get_status (msg);
525
526     if (!g_cancellable_is_cancelled (transfer->cancellable)) {
527       GST_LOG ("request errored. Code %d URI %s range %" G_GINT64_FORMAT " %"
528           G_GINT64_FORMAT, request->status_code, request->uri,
529           request->range_start, request->range_end);
530
531       if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
532         request->state = DOWNLOAD_REQUEST_STATE_ERROR;
533       finish_transfer_task (dh, transfer_task, error);
534     } else {
535       /* Ignore error from cancelled operation */
536       g_error_free (error);
537       finish_transfer_task (dh, transfer_task, NULL);
538     }
539     download_request_unlock (request);
540
541     /* No async callback queued - the transfer is done */
542     finish_transfer_task (dh, transfer_task, error);
543     return;
544   }
545
546   /* If the state went back to UNSENT, we were cancelled so don't override it */
547   if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT &&
548       request->state != DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED) {
549
550     request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
551     request->status_code = _soup_message_get_status (msg);
552     request->headers = handle_response_headers (transfer);
553     GST_TRACE ("request URI %s range %" G_GINT64_FORMAT " %"
554         G_GINT64_FORMAT " headers: %" GST_PTR_FORMAT,
555         request->uri, request->range_start, request->range_end,
556         request->headers);
557
558     if (SOUP_STATUS_IS_SUCCESSFUL (request->status_code)
559         || SOUP_STATUS_IS_REDIRECTION (request->status_code)) {
560       request->state = DOWNLOAD_REQUEST_STATE_HEADERS_RECEIVED;
561       transfer_task_report_progress (transfer_task);
562     } else {
563       goto finish_transfer_error;
564     }
565   }
566
567   if (!new_read_buffer (transfer))
568     goto finish_transfer_error;
569
570   download_request_unlock (request);
571
572   g_main_context_push_thread_default (dh->transfer_context);
573   g_input_stream_read_all_async (in, transfer->read_buffer,
574       transfer->read_buffer_size, G_PRIORITY_DEFAULT, transfer->cancellable,
575       on_read_ready, transfer_task);
576   g_main_context_pop_thread_default (dh->transfer_context);
577
578   g_object_unref (in);
579   return;
580
581 finish_transfer_error:
582   request->download_end_time = gst_adaptive_demux_clock_get_time (dh->clock);
583
584   if (request->in_use && !g_cancellable_is_cancelled (transfer->cancellable)) {
585     GST_LOG ("request complete. Code %d URI %s range %" G_GINT64_FORMAT " %"
586         G_GINT64_FORMAT, _soup_message_get_status (msg), request->uri,
587         request->range_start, request->range_end);
588     if (request->state != DOWNLOAD_REQUEST_STATE_UNSENT)
589       request->state = DOWNLOAD_REQUEST_STATE_ERROR;
590   }
591
592   g_free (transfer->read_buffer);
593   transfer->read_buffer = NULL;
594
595   download_request_unlock (request);
596   finish_transfer_task (dh, transfer_task, NULL);
597   g_object_unref (in);
598 }
599
600 DownloadHelper *
601 downloadhelper_new (GstAdaptiveDemuxClock * clock)
602 {
603   DownloadHelper *dh = g_new0 (DownloadHelper, 1);
604
605   dh->transfer_context = g_main_context_new ();
606   dh->loop = g_main_loop_new (dh->transfer_context, FALSE);
607
608   dh->clock = gst_adaptive_demux_clock_ref (clock);
609
610   g_mutex_init (&dh->transfer_lock);
611   dh->active_transfers = g_array_new (FALSE, FALSE, sizeof (GTask *));
612
613   g_array_set_clear_func (dh->active_transfers,
614       (GDestroyNotify) (release_transfer_task_by_ref));
615
616   dh->transfer_requests =
617       g_async_queue_new_full ((GDestroyNotify) g_object_unref);
618   dh->transfer_requests_source = NULL;
619
620   /* libsoup 3.0 (not 2.74 or 3.1) dispatches using a single source attached
621    * when the session is created, so we need to ensure it matches here. */
622   g_main_context_push_thread_default (dh->transfer_context);
623
624   /* Set 10 second timeout. Any longer is likely
625    * an attempt to reuse an already closed connection */
626   dh->session = _soup_session_new_with_options ("timeout", 10, NULL);
627
628   g_main_context_pop_thread_default (dh->transfer_context);
629
630   return dh;
631 }
632
633 void
634 downloadhelper_free (DownloadHelper * dh)
635 {
636   downloadhelper_stop (dh);
637
638   if (dh->session)
639     g_object_unref (dh->session);
640   g_main_loop_unref (dh->loop);
641   g_main_context_unref (dh->transfer_context);
642
643   if (dh->clock)
644     gst_adaptive_demux_clock_unref (dh->clock);
645
646   g_array_free (dh->active_transfers, TRUE);
647   g_async_queue_unref (dh->transfer_requests);
648
649   g_free (dh->referer);
650   g_free (dh->user_agent);
651   g_strfreev (dh->cookies);
652
653   g_free (dh);
654 }
655
656 void
657 downloadhelper_set_referer (DownloadHelper * dh, const gchar * referer)
658 {
659   g_mutex_lock (&dh->transfer_lock);
660   g_free (dh->referer);
661   dh->referer = g_strdup (referer);
662   g_mutex_unlock (&dh->transfer_lock);
663 }
664
665 void
666 downloadhelper_set_user_agent (DownloadHelper * dh, const gchar * user_agent)
667 {
668   g_mutex_lock (&dh->transfer_lock);
669   g_free (dh->user_agent);
670   dh->user_agent = g_strdup (user_agent);
671   g_mutex_unlock (&dh->transfer_lock);
672 }
673
674 /* Takes ownership of the strv */
675 void
676 downloadhelper_set_cookies (DownloadHelper * dh, gchar ** cookies)
677 {
678   g_mutex_lock (&dh->transfer_lock);
679   g_strfreev (dh->cookies);
680   dh->cookies = cookies;
681   g_mutex_unlock (&dh->transfer_lock);
682 }
683
684 /* Called with the transfer lock held */
685 static void
686 submit_transfer (DownloadHelper * dh, GTask * transfer_task)
687 {
688   DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
689   DownloadRequest *request = transfer->request;
690
691   download_request_lock (request);
692   request->state = DOWNLOAD_REQUEST_STATE_OPEN;
693   request->download_request_time =
694       gst_adaptive_demux_clock_get_time (dh->clock);
695
696   GST_LOG ("Submitting request URI %s range %" G_GINT64_FORMAT " %"
697       G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
698
699   transfer_task_report_progress (transfer_task);
700   download_request_unlock (request);
701
702   _soup_session_send_async (dh->session, transfer->msg, transfer->cancellable,
703       on_request_sent, transfer_task);
704   g_array_append_val (dh->active_transfers, transfer_task);
705 }
706
707 /* Idle callback that submits all pending transfers */
708 static gboolean
709 submit_transfers_cb (DownloadHelper * dh)
710 {
711   GTask *transfer;
712
713   g_mutex_lock (&dh->transfer_lock);
714   do {
715     transfer = g_async_queue_try_pop (dh->transfer_requests);
716     if (transfer) {
717       submit_transfer (dh, transfer);
718     }
719   } while (transfer != NULL);
720
721   /* FIXME: Use a PollFD like GWakeup instead? */
722   g_source_destroy (dh->transfer_requests_source);
723   g_source_unref (dh->transfer_requests_source);
724   dh->transfer_requests_source = NULL;
725
726   g_mutex_unlock (&dh->transfer_lock);
727
728   return G_SOURCE_REMOVE;
729 }
730
731 static gpointer
732 dh_transfer_thread_func (gpointer data)
733 {
734   DownloadHelper *dh = data;
735   GST_DEBUG ("DownloadHelper thread starting");
736
737   g_main_context_push_thread_default (dh->transfer_context);
738   g_main_loop_run (dh->loop);
739   g_main_context_pop_thread_default (dh->transfer_context);
740
741   GST_DEBUG ("Exiting DownloadHelper thread");
742   return NULL;
743 }
744
745 gboolean
746 downloadhelper_start (DownloadHelper * dh)
747 {
748   g_return_val_if_fail (dh->transfer_thread == NULL, FALSE);
749
750   g_mutex_lock (&dh->transfer_lock);
751   if (!dh->running) {
752
753     dh->transfer_thread =
754         g_thread_try_new ("adaptive-download-task", dh_transfer_thread_func, dh,
755         NULL);
756     dh->running = (dh->transfer_thread != NULL);
757   }
758   g_mutex_unlock (&dh->transfer_lock);
759
760   return dh->running;
761 }
762
763 void
764 downloadhelper_stop (DownloadHelper * dh)
765 {
766   int i;
767   GThread *transfer_thread = NULL;
768
769   GST_DEBUG ("Stopping DownloadHelper loop");
770
771   g_mutex_lock (&dh->transfer_lock);
772
773   dh->running = FALSE;
774
775   for (i = 0; i < dh->active_transfers->len; i++) {
776     GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
777     DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
778     g_cancellable_cancel (transfer->cancellable);
779   }
780
781   g_main_loop_quit (dh->loop);
782
783   transfer_thread = dh->transfer_thread;
784   dh->transfer_thread = NULL;
785
786   g_mutex_unlock (&dh->transfer_lock);
787
788   if (transfer_thread != NULL) {
789     g_thread_join (transfer_thread);
790   }
791
792   /* The transfer thread has exited at this point - any remaining transfers are unfinished
793    * and need cleaning up */
794   g_mutex_lock (&dh->transfer_lock);
795
796   for (i = 0; i < dh->active_transfers->len; i++) {
797     GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
798     DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
799     DownloadRequest *request = transfer->request;
800
801     download_request_lock (request);
802     /* Reset the state to UNSENT, to indicate cancellation, like an XMLHttpRequest does */
803     request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
804     download_request_unlock (request);
805
806     transfer->complete = TRUE;
807     if (transfer->blocking)
808       g_cond_broadcast (&transfer->cond);
809     else
810       g_task_return_boolean (transfer_task, TRUE);
811   }
812
813   g_array_set_size (dh->active_transfers, 0);
814   g_mutex_unlock (&dh->transfer_lock);
815 }
816
817 gboolean
818 downloadhelper_submit_request (DownloadHelper * dh,
819     const gchar * referer, DownloadFlags flags, DownloadRequest * request,
820     GError ** err)
821 {
822   GTask *transfer_task = NULL;
823   const gchar *method;
824   SoupMessage *msg;
825   SoupMessageHeaders *msg_headers;
826   gboolean blocking = (flags & DOWNLOAD_FLAG_BLOCKING) != 0;
827
828   method =
829       (flags & DOWNLOAD_FLAG_HEADERS_ONLY) ? SOUP_METHOD_HEAD : SOUP_METHOD_GET;
830
831   download_request_lock (request);
832   if (request->in_use) {
833     GST_ERROR ("Request for URI %s reusing active request object",
834         request->uri);
835     download_request_unlock (request);
836     return FALSE;
837   }
838
839   /* Clear the state back to unsent */
840   request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
841
842   msg = _soup_message_new (method, request->uri);
843   if (msg == NULL) {
844     g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
845         "Could not parse download URI %s", request->uri);
846
847     request->state = DOWNLOAD_REQUEST_STATE_ERROR;
848     download_request_unlock (request);
849
850     return FALSE;
851   }
852
853   /* NOTE: There was a bug where Akamai servers return the
854    * wrong result for a range request on small files. To avoid
855    * it if the range starts within the first KB of the file, just
856    * start at 0 instead */
857   if (request->range_start < 1024)
858     request->range_start = 0;
859
860   msg_headers = _soup_message_get_request_headers (msg);
861
862   if (request->range_start != 0 || request->range_end != -1) {
863     _soup_message_headers_set_range (msg_headers, request->range_start,
864         request->range_end);
865   }
866
867   download_request_unlock (request);
868
869   /* If resubmitting a request, clear any stale / unused data */
870   download_request_begin_download (request);
871
872   if ((flags & DOWNLOAD_FLAG_COMPRESS) == 0) {
873     _soup_message_disable_feature (msg, _soup_content_decoder_get_type ());
874   }
875   if (flags & DOWNLOAD_FLAG_FORCE_REFRESH) {
876     _soup_message_headers_append (msg_headers, "Cache-Control", "max-age=0");
877   }
878
879   /* Take the lock to protect header strings */
880   g_mutex_lock (&dh->transfer_lock);
881
882   if (referer != NULL) {
883     _soup_message_headers_append (msg_headers, "Referer", referer);
884   } else if (dh->referer != NULL) {
885     _soup_message_headers_append (msg_headers, "Referer", dh->referer);
886   }
887
888   if (dh->user_agent != NULL) {
889     _soup_message_headers_append (msg_headers, "User-Agent", dh->user_agent);
890   }
891
892   if (dh->cookies != NULL) {
893     gchar **cookie;
894
895     for (cookie = dh->cookies; *cookie != NULL; cookie++) {
896       _soup_message_headers_append (msg_headers, "Cookie", *cookie);
897     }
898   }
899
900   transfer_task = transfer_task_new (dh, request, msg, blocking);
901
902   if (!dh->running) {
903     /* The download helper was deactivated just as we went to dispatch this request.
904      * Abort and manually wake the request, as it never went in the active_transfer list */
905     g_mutex_unlock (&dh->transfer_lock);
906
907     download_request_lock (request);
908     request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
909     request->in_use = FALSE;
910     download_request_unlock (request);
911
912     g_cancellable_cancel (g_task_get_cancellable (transfer_task));
913     g_task_return_error_if_cancelled (transfer_task);
914     g_object_unref (transfer_task);
915
916     return FALSE;
917   }
918
919   download_request_lock (request);
920   request->in_use = TRUE;
921   download_request_unlock (request);
922
923   g_signal_connect (msg, "restarted", G_CALLBACK (soup_msg_restarted_cb),
924       transfer_task);
925
926   /* Now send the request over to the main loop for actual submission */
927   GST_LOG ("Submitting transfer task %p", transfer_task);
928   g_async_queue_push (dh->transfer_requests, transfer_task);
929
930   /* No pending idle source to wake the transfer loop - so create one */
931   if (dh->transfer_requests_source == NULL) {
932     dh->transfer_requests_source = g_idle_source_new ();
933     g_source_set_callback (dh->transfer_requests_source,
934         (GSourceFunc) submit_transfers_cb, dh, NULL);
935     g_source_attach (dh->transfer_requests_source, dh->transfer_context);
936   }
937
938   if (blocking) {
939     DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
940
941     /* We need an extra ref on the task to make sure it stays alive.
942      * We pushed it in the async queue, but didn't unlock yet, so while
943      * we gave away our ref, the receiver can't have unreffed it */
944     g_object_ref (transfer_task);
945     while (!transfer->complete)
946       g_cond_wait (&transfer->cond, &dh->transfer_lock);
947     g_object_unref (transfer_task);
948   }
949
950   g_mutex_unlock (&dh->transfer_lock);
951
952   return TRUE;
953 }
954
955 void
956 downloadhelper_cancel_request (DownloadHelper * dh, DownloadRequest * request)
957 {
958   int i;
959
960   g_mutex_lock (&dh->transfer_lock);
961
962   download_request_lock (request);
963   if (!request->in_use)
964     goto out;
965
966   GST_DEBUG ("Cancelling request for URI %s range %" G_GINT64_FORMAT " %"
967       G_GINT64_FORMAT, request->uri, request->range_start, request->range_end);
968
969   request->state = DOWNLOAD_REQUEST_STATE_UNSENT;
970
971   for (i = dh->active_transfers->len - 1; i >= 0; i--) {
972     GTask *transfer_task = g_array_index (dh->active_transfers, GTask *, i);
973     DownloadHelperTransfer *transfer = g_task_get_task_data (transfer_task);
974
975     if (transfer->request == request) {
976       GST_DEBUG ("Found transfer %p for request for URI %s range %"
977           G_GINT64_FORMAT " %" G_GINT64_FORMAT, transfer, request->uri,
978           request->range_start, request->range_end);
979       g_cancellable_cancel (transfer->cancellable);
980       break;
981     }
982   }
983
984 out:
985   download_request_unlock (request);
986   g_mutex_unlock (&dh->transfer_lock);
987 }
988
989 DownloadRequest *
990 downloadhelper_fetch_uri_range (DownloadHelper * dh, const gchar * uri,
991     const gchar * referer, DownloadFlags flags, gint64 range_start,
992     gint64 range_end, GError ** err)
993 {
994   DownloadRequest *request;
995
996   g_return_val_if_fail (uri != NULL, NULL);
997
998   GST_DEBUG ("Fetching URI %s range %" G_GINT64_FORMAT " %" G_GINT64_FORMAT,
999       uri, range_start, range_end);
1000
1001   flags |= DOWNLOAD_FLAG_BLOCKING;
1002
1003   request = download_request_new_uri_range (uri, range_start, range_end);
1004
1005   if (!downloadhelper_submit_request (dh, referer, flags, request, err)) {
1006     download_request_unref (request);
1007     return NULL;
1008   }
1009
1010   return request;
1011 }
1012
1013 DownloadRequest *
1014 downloadhelper_fetch_uri (DownloadHelper * dh, const gchar * uri,
1015     const gchar * referer, DownloadFlags flags, GError ** err)
1016 {
1017   return downloadhelper_fetch_uri_range (dh, uri, referer, flags, 0, -1, err);
1018 }