mssdemux: add connection-speed property
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  * TODO
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include "gst/gst-i18n-plugin.h"
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "gstmssdemux.h"
42
43 GST_DEBUG_CATEGORY (mssdemux_debug);
44
45 #define DEFAULT_CONNECTION_SPEED 0
46
47 enum
48 {
49   PROP_0,
50
51   PROP_CONNECTION_SPEED,
52   PROP_LAST
53 };
54
55 static GstStaticPadTemplate gst_mss_demux_sink_template =
56 GST_STATIC_PAD_TEMPLATE ("sink",
57     GST_PAD_SINK,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
60     );
61
62 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
63 GST_STATIC_PAD_TEMPLATE ("video_%02u",
64     GST_PAD_SRC,
65     GST_PAD_SOMETIMES,
66     GST_STATIC_CAPS_ANY);
67
68 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
69 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
70     GST_PAD_SRC,
71     GST_PAD_SOMETIMES,
72     GST_STATIC_CAPS_ANY);
73
74 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
75
76 static void gst_mss_demux_dispose (GObject * object);
77 static void gst_mss_demux_set_property (GObject * object, guint prop_id,
78     const GValue * value, GParamSpec * pspec);
79 static void gst_mss_demux_get_property (GObject * object, guint prop_id,
80     GValue * value, GParamSpec * pspec);
81 static GstStateChangeReturn gst_mss_demux_change_state (GstElement * element,
82     GstStateChange transition);
83 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
84 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
85
86 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
87
88 static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
89
90 static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
91
92 static void
93 gst_mss_demux_base_init (gpointer klass)
94 {
95   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
96
97   gst_element_class_add_static_pad_template (element_class,
98       &gst_mss_demux_sink_template);
99   gst_element_class_add_static_pad_template (element_class,
100       &gst_mss_demux_videosrc_template);
101   gst_element_class_add_static_pad_template (element_class,
102       &gst_mss_demux_audiosrc_template);
103   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
104       "demuxer", "Demuxer",
105       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
106       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
107
108   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
109 }
110
111 static void
112 gst_mss_demux_class_init (GstMssDemuxClass * klass)
113 {
114   GObjectClass *gobject_class;
115   GstElementClass *gstelement_class;
116
117   gobject_class = (GObjectClass *) klass;
118   gstelement_class = (GstElementClass *) klass;
119
120   parent_class = g_type_class_peek_parent (klass);
121
122   gobject_class->dispose = gst_mss_demux_dispose;
123   gobject_class->set_property = gst_mss_demux_set_property;
124   gobject_class->get_property = gst_mss_demux_get_property;
125
126   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
127       g_param_spec_uint64 ("connection-speed", "Connection Speed",
128           "Network connection speed in kbps (0 = unknown)",
129           0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
130           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
131
132   gstelement_class->change_state =
133       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
134 }
135
136 static void
137 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
138 {
139   mssdemux->sinkpad =
140       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
141   gst_pad_set_chain_function (mssdemux->sinkpad,
142       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
143   gst_pad_set_event_function (mssdemux->sinkpad,
144       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
145   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
146 }
147
148 static GstMssDemuxStream *
149 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
150     GstMssStream * manifeststream, GstPad * srcpad)
151 {
152   GstMssDemuxStream *stream;
153
154   stream = g_new0 (GstMssDemuxStream, 1);
155   stream->downloader = gst_uri_downloader_new ();
156
157   /* Streaming task */
158   g_static_rec_mutex_init (&stream->stream_lock);
159   stream->stream_task =
160       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
161   gst_task_set_lock (stream->stream_task, &stream->stream_lock);
162
163   stream->pad = srcpad;
164   stream->manifest_stream = manifeststream;
165   stream->parent = mssdemux;
166
167   return stream;
168 }
169
170 static void
171 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
172 {
173   if (stream->stream_task) {
174     if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
175       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
176           GST_DEBUG_PAD_NAME (stream->pad));
177       gst_task_stop (stream->stream_task);
178       g_static_rec_mutex_lock (&stream->stream_lock);
179       g_static_rec_mutex_unlock (&stream->stream_lock);
180       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
181       gst_task_join (stream->stream_task);
182       GST_LOG_OBJECT (stream->parent, "Finished");
183     }
184     gst_object_unref (stream->stream_task);
185     g_static_rec_mutex_free (&stream->stream_lock);
186     stream->stream_task = NULL;
187   }
188
189   if (stream->pending_newsegment) {
190     gst_event_unref (stream->pending_newsegment);
191     stream->pending_newsegment = NULL;
192   }
193
194
195   if (stream->downloader != NULL) {
196     g_object_unref (stream->downloader);
197     stream->downloader = NULL;
198   }
199   if (stream->pad) {
200     gst_object_unref (stream->pad);
201     stream->pad = NULL;
202   }
203   g_free (stream);
204 }
205
206 static void
207 gst_mss_demux_reset (GstMssDemux * mssdemux)
208 {
209   GSList *iter;
210   if (mssdemux->manifest_buffer) {
211     gst_buffer_unref (mssdemux->manifest_buffer);
212     mssdemux->manifest_buffer = NULL;
213   }
214
215   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
216     GstMssDemuxStream *stream = iter->data;
217     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
218     gst_mss_demux_stream_free (stream);
219   }
220   g_slist_free (mssdemux->streams);
221   mssdemux->streams = NULL;
222
223   if (mssdemux->manifest) {
224     gst_mss_manifest_free (mssdemux->manifest);
225     mssdemux->manifest = NULL;
226   }
227
228   mssdemux->n_videos = mssdemux->n_audios = 0;
229   g_free (mssdemux->base_url);
230   mssdemux->base_url = NULL;
231 }
232
233 static void
234 gst_mss_demux_dispose (GObject * object)
235 {
236   /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
237
238   G_OBJECT_CLASS (parent_class)->dispose (object);
239 }
240
241 static void
242 gst_mss_demux_set_property (GObject * object, guint prop_id,
243     const GValue * value, GParamSpec * pspec)
244 {
245   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
246
247   switch (prop_id) {
248     case PROP_CONNECTION_SPEED:
249       mssdemux->connection_speed = g_value_get_uint (value) * 1000;
250       break;
251     default:
252       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
253       break;
254   }
255 }
256
257 static void
258 gst_mss_demux_get_property (GObject * object, guint prop_id, GValue * value,
259     GParamSpec * pspec)
260 {
261   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
262
263   switch (prop_id) {
264     case PROP_CONNECTION_SPEED:
265       g_value_set_uint (value, mssdemux->connection_speed / 1000);
266       break;
267     default:
268       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
269       break;
270   }
271 }
272
273 static GstStateChangeReturn
274 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
275 {
276   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
277   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
278
279   switch (transition) {
280     case GST_STATE_CHANGE_PAUSED_TO_READY:
281       gst_mss_demux_reset (mssdemux);
282       break;
283     case GST_STATE_CHANGE_READY_TO_NULL:
284       break;
285     default:
286       break;
287   }
288
289   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
290
291   switch (transition) {
292     case GST_STATE_CHANGE_PAUSED_TO_READY:{
293       break;
294     }
295     default:
296       break;
297   }
298
299   return result;
300 }
301
302 static GstFlowReturn
303 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
304 {
305   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
306   if (mssdemux->manifest_buffer == NULL)
307     mssdemux->manifest_buffer = buffer;
308   else
309     mssdemux->manifest_buffer =
310         gst_buffer_join (mssdemux->manifest_buffer, buffer);
311
312   return GST_FLOW_OK;
313 }
314
315 static void
316 gst_mss_demux_start (GstMssDemux * mssdemux)
317 {
318   GSList *iter;
319
320   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
321   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
322     GstMssDemuxStream *stream = iter->data;
323     gst_task_start (stream->stream_task);
324   }
325 }
326
327 static gboolean
328 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
329 {
330   GSList *iter;
331   gboolean ret = TRUE;
332
333   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
334     GstMssDemuxStream *stream = iter->data;
335     gst_event_ref (event);
336     ret = ret & gst_pad_push_event (stream->pad, event);
337   }
338   return ret;
339 }
340
341 static gboolean
342 gst_mss_demux_event (GstPad * pad, GstEvent * event)
343 {
344   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
345   gboolean forward = TRUE;
346   gboolean ret = TRUE;
347
348   switch (GST_EVENT_TYPE (event)) {
349     case GST_EVENT_EOS:
350       if (mssdemux->manifest_buffer == NULL) {
351         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
352         break;
353       }
354
355       gst_mss_demux_process_manifest (mssdemux);
356       gst_mss_demux_start (mssdemux);
357       forward = FALSE;
358       break;
359     default:
360       break;
361   }
362
363   if (forward) {
364     ret = gst_pad_event_default (pad, event);
365   } else {
366     gst_event_unref (event);
367   }
368
369   return ret;
370 }
371
372 static gboolean
373 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
374 {
375   GstMssDemux *mssdemux;
376
377   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
378
379   switch (event->type) {
380     case GST_EVENT_SEEK:
381     {
382       gdouble rate;
383       GstFormat format;
384       GstSeekFlags flags;
385       GstSeekType start_type, stop_type;
386       gint64 start, stop;
387       GstEvent *newsegment;
388       GSList *iter;
389
390       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
391
392       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
393           &stop_type, &stop);
394
395       if (format != GST_FORMAT_TIME)
396         return FALSE;
397
398       GST_DEBUG_OBJECT (mssdemux,
399           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
400           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
401
402       if (flags & GST_SEEK_FLAG_FLUSH) {
403         GstEvent *flush = gst_event_new_flush_start ();
404         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
405
406         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
407         gst_mss_demux_push_src_event (mssdemux, flush);
408         gst_event_unref (flush);
409       }
410
411       /* stop the tasks */
412       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
413         GstMssDemuxStream *stream = iter->data;
414
415         gst_uri_downloader_cancel (stream->downloader);
416         gst_task_pause (stream->stream_task);
417       }
418       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
419         GstMssDemuxStream *stream = iter->data;
420         g_static_rec_mutex_lock (&stream->stream_lock);
421       }
422
423       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
424         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
425         return FALSE;
426       }
427
428       newsegment =
429           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
430       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
431       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
432         GstMssDemuxStream *stream = iter->data;
433
434         stream->pending_newsegment = gst_event_ref (newsegment);
435       }
436       gst_event_unref (newsegment);
437
438       if (flags & GST_SEEK_FLAG_FLUSH) {
439         GstEvent *flush = gst_event_new_flush_stop ();
440         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
441
442         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
443         gst_mss_demux_push_src_event (mssdemux, flush);
444         gst_event_unref (flush);
445       }
446
447       /* restart tasks */
448       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
449         GstMssDemuxStream *stream = iter->data;
450         g_static_rec_mutex_unlock (&stream->stream_lock);
451       }
452       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
453         GstMssDemuxStream *stream = iter->data;
454
455         gst_task_start (stream->stream_task);
456       }
457
458       return TRUE;
459     }
460     default:
461       break;
462   }
463
464   return gst_pad_event_default (pad, event);
465 }
466
467 static gboolean
468 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
469 {
470   GstMssDemux *mssdemux;
471   gboolean ret = FALSE;
472
473   if (query == NULL)
474     return FALSE;
475
476   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
477
478   switch (query->type) {
479     case GST_QUERY_DURATION:{
480       GstClockTime duration = -1;
481       GstFormat fmt;
482
483       gst_query_parse_duration (query, &fmt, NULL);
484       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
485         /* TODO should we use the streams accumulated duration or the main manifest duration? */
486         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
487
488         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
489           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
490           ret = TRUE;
491         }
492       }
493       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
494           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
495       break;
496     }
497     case GST_QUERY_LATENCY:
498       gst_query_set_latency (query, FALSE, 0, -1);
499       ret = TRUE;
500       break;
501     case GST_QUERY_SEEKING:{
502       GstFormat fmt;
503       gint64 stop = -1;
504
505       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
506       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
507           fmt);
508       if (fmt == GST_FORMAT_TIME) {
509         GstClockTime duration;
510         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
511         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
512           stop = duration;
513         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
514         ret = TRUE;
515         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
516             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
517       }
518       break;
519     }
520     default:
521       /* Don't fordward queries upstream because of the special nature of this
522        *  "demuxer", which relies on the upstream element only to be fed
523        *  the Manifest
524        */
525       break;
526   }
527
528   return ret;
529 }
530
531 static void
532 _set_src_pad_functions (GstPad * pad)
533 {
534   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
535   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
536 }
537
538 static void
539 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
540 {
541   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
542   GSList *iter;
543
544   if (streams == NULL) {
545     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
546     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
547         (_("This file contains no playable streams.")),
548         ("no streams found at the Manifest"));
549     return;
550   }
551
552   for (iter = streams; iter; iter = g_slist_next (iter)) {
553     gchar *name;
554     GstPad *srcpad = NULL;
555     GstMssDemuxStream *stream = NULL;
556     GstMssStream *manifeststream = iter->data;
557     GstMssStreamType streamtype;
558
559     streamtype = gst_mss_stream_get_type (manifeststream);
560     GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
561         gst_mss_stream_type_name (streamtype));
562
563     /* TODO use stream's name as the pad name? */
564     if (streamtype == MSS_STREAM_TYPE_VIDEO) {
565       name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
566       srcpad =
567           gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
568           name);
569       g_free (name);
570     } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
571       name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
572       srcpad =
573           gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
574           name);
575       g_free (name);
576     }
577
578     if (!srcpad) {
579       GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
580       continue;
581     }
582
583     _set_src_pad_functions (srcpad);
584
585     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
586     gst_mss_stream_set_active (manifeststream, TRUE);
587     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
588   }
589
590   /* select initial bitrates */
591   gst_mss_manifest_change_bitrate (mssdemux->manifest,
592       mssdemux->connection_speed);
593 }
594
595 static gboolean
596 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
597 {
598   GstCaps *caps;
599   GstCaps *media_caps;
600   GstPad *pad = stream->pad;
601
602   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
603
604   if (media_caps) {
605     caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
606         "mss-fragmented", "timescale", G_TYPE_UINT64,
607         gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
608         GST_TYPE_CAPS, media_caps, NULL);
609     gst_caps_unref (media_caps);
610     gst_pad_set_caps (pad, caps);
611     gst_caps_unref (caps);
612
613     gst_pad_set_active (pad, TRUE);
614     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
615         GST_DEBUG_PAD_NAME (pad), caps);
616     gst_object_ref (pad);
617     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
618   } else {
619     GST_WARNING_OBJECT (mssdemux,
620         "Couldn't get caps from manifest stream %p %s, not exposing it", stream,
621         GST_PAD_NAME (stream->pad));
622     return FALSE;
623   }
624   return TRUE;
625 }
626
627 static void
628 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
629 {
630   GstQuery *query;
631   gchar *uri = NULL;
632   gboolean ret;
633   GSList *iter;
634
635   g_return_if_fail (mssdemux->manifest_buffer != NULL);
636   g_return_if_fail (mssdemux->manifest == NULL);
637
638   query = gst_query_new_uri ();
639   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
640   if (ret) {
641     gchar *baseurl_end;
642     gst_query_parse_uri (query, &uri);
643     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
644
645     baseurl_end = g_strrstr (uri, "/Manifest");
646     if (baseurl_end) {
647       /* set the new end of the string */
648       baseurl_end[0] = '\0';
649     } else {
650       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
651     }
652
653     mssdemux->base_url = uri;
654   }
655   gst_query_unref (query);
656
657   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
658   if (!mssdemux->manifest) {
659     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
660         ("Xml manifest file couldn't be parsed"));
661     return;
662   }
663
664   gst_mss_demux_create_streams (mssdemux);
665   for (iter = mssdemux->streams; iter;) {
666     GSList *current = iter;
667     GstMssDemuxStream *stream = iter->data;
668     iter = g_slist_next (iter); /* do it ourselves as we want it done in the beginning of the loop */
669     if (!gst_mss_demux_expose_stream (mssdemux, stream)) {
670       gst_mss_demux_stream_free (stream);
671       mssdemux->streams = g_slist_delete_link (mssdemux->streams, current);
672     }
673   }
674
675   if (!mssdemux->streams) {
676     /* no streams */
677     GST_WARNING_OBJECT (mssdemux, "Couldn't identify the caps for any of the "
678         "streams found in the manifest");
679     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
680         (_("This file contains no playable streams.")),
681         ("No known stream formats found at the Manifest"));
682     return;
683   }
684
685   gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
686 }
687
688 static void
689 gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
690 {
691   GstMssDemux *mssdemux = stream->parent;
692   gchar *path;
693   gchar *url;
694   GstFragment *fragment;
695   GstBuffer *buffer;
696   GstFlowReturn ret;
697
698   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
699   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
700   switch (ret) {
701     case GST_FLOW_OK:
702       break;                    /* all is good, let's go */
703     case GST_FLOW_UNEXPECTED:  /* EOS */
704       goto eos;
705     case GST_FLOW_ERROR:
706       goto error;
707     default:
708       break;
709   }
710   if (!path) {
711     goto no_url_error;
712   }
713   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
714
715   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
716
717   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
718   g_free (path);
719   g_free (url);
720
721   if (!fragment) {
722     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
723     /* TODO check if we are truly stoping */
724     return;
725   }
726
727   buffer = gst_fragment_get_buffer (fragment);
728   buffer = gst_buffer_make_metadata_writable (buffer);
729   gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad));
730   GST_BUFFER_TIMESTAMP (buffer) =
731       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
732   GST_BUFFER_DURATION (buffer) =
733       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
734
735   if (G_UNLIKELY (stream->pending_newsegment)) {
736     gst_pad_push_event (stream->pad, stream->pending_newsegment);
737     stream->pending_newsegment = NULL;
738   }
739
740   GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
741       GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
742   ret = gst_pad_push (stream->pad, buffer);
743   switch (ret) {
744     case GST_FLOW_UNEXPECTED:
745       goto eos;                 /* EOS ? */
746     case GST_FLOW_ERROR:
747       goto error;
748     case GST_FLOW_NOT_LINKED:
749       break;                    /* TODO what to do here? pause the task or just keep pushing? */
750     case GST_FLOW_OK:
751     default:
752       break;
753   }
754
755   gst_mss_stream_advance_fragment (stream->manifest_stream);
756   return;
757
758 eos:
759   {
760     GstEvent *eos = gst_event_new_eos ();
761     GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
762         GST_DEBUG_PAD_NAME (stream->pad));
763     gst_pad_push_event (stream->pad, eos);
764     gst_task_stop (stream->stream_task);
765     return;
766   }
767 no_url_error:
768   {
769     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
770         (_("Failed to get fragment URL.")),
771         ("An error happened when getting fragment URL"));
772     gst_task_stop (stream->stream_task);
773     return;
774   }
775 error:
776   {
777     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
778     gst_task_stop (stream->stream_task);
779     return;
780   }
781 }