9747601f4d0d84f4481d21c40c34588898d8b014
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  * TODO
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include "gst/gst-i18n-plugin.h"
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "gstmssdemux.h"
42
43 GST_DEBUG_CATEGORY (mssdemux_debug);
44
45 #define DEFAULT_CONNECTION_SPEED 0
46
47 enum
48 {
49   PROP_0,
50
51   PROP_CONNECTION_SPEED,
52   PROP_LAST
53 };
54
55 static GstStaticPadTemplate gst_mss_demux_sink_template =
56 GST_STATIC_PAD_TEMPLATE ("sink",
57     GST_PAD_SINK,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
60     );
61
62 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
63 GST_STATIC_PAD_TEMPLATE ("video_%02u",
64     GST_PAD_SRC,
65     GST_PAD_SOMETIMES,
66     GST_STATIC_CAPS_ANY);
67
68 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
69 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
70     GST_PAD_SRC,
71     GST_PAD_SOMETIMES,
72     GST_STATIC_CAPS_ANY);
73
74 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
75
76 static void gst_mss_demux_dispose (GObject * object);
77 static void gst_mss_demux_set_property (GObject * object, guint prop_id,
78     const GValue * value, GParamSpec * pspec);
79 static void gst_mss_demux_get_property (GObject * object, guint prop_id,
80     GValue * value, GParamSpec * pspec);
81 static GstStateChangeReturn gst_mss_demux_change_state (GstElement * element,
82     GstStateChange transition);
83 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
84 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
85
86 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
87
88 static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
89
90 static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
91
92 static void
93 gst_mss_demux_base_init (gpointer klass)
94 {
95   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
96
97   gst_element_class_add_static_pad_template (element_class,
98       &gst_mss_demux_sink_template);
99   gst_element_class_add_static_pad_template (element_class,
100       &gst_mss_demux_videosrc_template);
101   gst_element_class_add_static_pad_template (element_class,
102       &gst_mss_demux_audiosrc_template);
103   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
104       "demuxer", "Demuxer",
105       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
106       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
107
108   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
109 }
110
111 static void
112 gst_mss_demux_class_init (GstMssDemuxClass * klass)
113 {
114   GObjectClass *gobject_class;
115   GstElementClass *gstelement_class;
116
117   gobject_class = (GObjectClass *) klass;
118   gstelement_class = (GstElementClass *) klass;
119
120   parent_class = g_type_class_peek_parent (klass);
121
122   gobject_class->dispose = gst_mss_demux_dispose;
123   gobject_class->set_property = gst_mss_demux_set_property;
124   gobject_class->get_property = gst_mss_demux_get_property;
125
126   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
127       g_param_spec_uint ("connection-speed", "Connection Speed",
128           "Network connection speed in kbps (0 = unknown)",
129           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
130           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
131
132   gstelement_class->change_state =
133       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
134 }
135
136 static void
137 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
138 {
139   mssdemux->sinkpad =
140       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
141   gst_pad_set_chain_function (mssdemux->sinkpad,
142       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
143   gst_pad_set_event_function (mssdemux->sinkpad,
144       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
145   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
146 }
147
148 static GstMssDemuxStream *
149 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
150     GstMssStream * manifeststream, GstPad * srcpad)
151 {
152   GstMssDemuxStream *stream;
153
154   stream = g_new0 (GstMssDemuxStream, 1);
155   stream->downloader = gst_uri_downloader_new ();
156
157   /* Streaming task */
158   g_static_rec_mutex_init (&stream->stream_lock);
159   stream->stream_task =
160       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
161   gst_task_set_lock (stream->stream_task, &stream->stream_lock);
162
163   stream->pad = srcpad;
164   stream->manifest_stream = manifeststream;
165   stream->parent = mssdemux;
166
167   return stream;
168 }
169
170 static void
171 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
172 {
173   if (stream->stream_task) {
174     if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
175       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
176           GST_DEBUG_PAD_NAME (stream->pad));
177       gst_task_stop (stream->stream_task);
178       g_static_rec_mutex_lock (&stream->stream_lock);
179       g_static_rec_mutex_unlock (&stream->stream_lock);
180       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
181       gst_task_join (stream->stream_task);
182       GST_LOG_OBJECT (stream->parent, "Finished");
183     }
184     gst_object_unref (stream->stream_task);
185     g_static_rec_mutex_free (&stream->stream_lock);
186     stream->stream_task = NULL;
187   }
188
189   if (stream->pending_newsegment) {
190     gst_event_unref (stream->pending_newsegment);
191     stream->pending_newsegment = NULL;
192   }
193
194
195   if (stream->downloader != NULL) {
196     g_object_unref (stream->downloader);
197     stream->downloader = NULL;
198   }
199   if (stream->pad) {
200     gst_object_unref (stream->pad);
201     stream->pad = NULL;
202   }
203   g_free (stream);
204 }
205
206 static void
207 gst_mss_demux_reset (GstMssDemux * mssdemux)
208 {
209   GSList *iter;
210   if (mssdemux->manifest_buffer) {
211     gst_buffer_unref (mssdemux->manifest_buffer);
212     mssdemux->manifest_buffer = NULL;
213   }
214
215   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
216     GstMssDemuxStream *stream = iter->data;
217     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
218     gst_mss_demux_stream_free (stream);
219   }
220   g_slist_free (mssdemux->streams);
221   mssdemux->streams = NULL;
222
223   if (mssdemux->manifest) {
224     gst_mss_manifest_free (mssdemux->manifest);
225     mssdemux->manifest = NULL;
226   }
227
228   mssdemux->n_videos = mssdemux->n_audios = 0;
229   g_free (mssdemux->base_url);
230   mssdemux->base_url = NULL;
231 }
232
233 static void
234 gst_mss_demux_dispose (GObject * object)
235 {
236   /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
237
238   G_OBJECT_CLASS (parent_class)->dispose (object);
239 }
240
241 static void
242 gst_mss_demux_set_property (GObject * object, guint prop_id,
243     const GValue * value, GParamSpec * pspec)
244 {
245   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
246
247   switch (prop_id) {
248     case PROP_CONNECTION_SPEED:
249       GST_OBJECT_LOCK (mssdemux);
250       mssdemux->connection_speed = g_value_get_uint (value) * 1000;
251       mssdemux->update_bitrates = TRUE;
252       GST_DEBUG_OBJECT (mssdemux, "Connection speed set to %llu",
253           mssdemux->connection_speed);
254       GST_OBJECT_UNLOCK (mssdemux);
255       break;
256     default:
257       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
258       break;
259   }
260 }
261
262 static void
263 gst_mss_demux_get_property (GObject * object, guint prop_id, GValue * value,
264     GParamSpec * pspec)
265 {
266   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
267
268   switch (prop_id) {
269     case PROP_CONNECTION_SPEED:
270       g_value_set_uint (value, mssdemux->connection_speed / 1000);
271       break;
272     default:
273       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
274       break;
275   }
276 }
277
278 static GstStateChangeReturn
279 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
280 {
281   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
282   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
283
284   switch (transition) {
285     case GST_STATE_CHANGE_PAUSED_TO_READY:
286       gst_mss_demux_reset (mssdemux);
287       break;
288     case GST_STATE_CHANGE_READY_TO_NULL:
289       break;
290     default:
291       break;
292   }
293
294   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
295
296   switch (transition) {
297     case GST_STATE_CHANGE_PAUSED_TO_READY:{
298       break;
299     }
300     default:
301       break;
302   }
303
304   return result;
305 }
306
307 static GstFlowReturn
308 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
309 {
310   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
311   if (mssdemux->manifest_buffer == NULL)
312     mssdemux->manifest_buffer = buffer;
313   else
314     mssdemux->manifest_buffer =
315         gst_buffer_join (mssdemux->manifest_buffer, buffer);
316
317   return GST_FLOW_OK;
318 }
319
320 static void
321 gst_mss_demux_start (GstMssDemux * mssdemux)
322 {
323   GSList *iter;
324
325   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
326   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
327     GstMssDemuxStream *stream = iter->data;
328     gst_task_start (stream->stream_task);
329   }
330 }
331
332 static gboolean
333 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
334 {
335   GSList *iter;
336   gboolean ret = TRUE;
337
338   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
339     GstMssDemuxStream *stream = iter->data;
340     gst_event_ref (event);
341     ret = ret & gst_pad_push_event (stream->pad, event);
342   }
343   return ret;
344 }
345
346 static gboolean
347 gst_mss_demux_event (GstPad * pad, GstEvent * event)
348 {
349   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
350   gboolean forward = TRUE;
351   gboolean ret = TRUE;
352
353   switch (GST_EVENT_TYPE (event)) {
354     case GST_EVENT_EOS:
355       if (mssdemux->manifest_buffer == NULL) {
356         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
357         break;
358       }
359
360       gst_mss_demux_process_manifest (mssdemux);
361       gst_mss_demux_start (mssdemux);
362       forward = FALSE;
363       break;
364     default:
365       break;
366   }
367
368   if (forward) {
369     ret = gst_pad_event_default (pad, event);
370   } else {
371     gst_event_unref (event);
372   }
373
374   return ret;
375 }
376
377 static void
378 gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
379 {
380   GSList *iter;
381   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
382     GstMssDemuxStream *stream = iter->data;
383
384     if (immediate)
385       gst_uri_downloader_cancel (stream->downloader);
386     gst_task_pause (stream->stream_task);
387   }
388   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
389     GstMssDemuxStream *stream = iter->data;
390     g_static_rec_mutex_lock (&stream->stream_lock);
391   }
392 }
393
394 static void
395 gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
396 {
397   GSList *iter;
398   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
399     GstMssDemuxStream *stream = iter->data;
400     g_static_rec_mutex_unlock (&stream->stream_lock);
401   }
402   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
403     GstMssDemuxStream *stream = iter->data;
404
405     gst_task_start (stream->stream_task);
406   }
407 }
408
409 static gboolean
410 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
411 {
412   GstMssDemux *mssdemux;
413
414   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
415
416   switch (event->type) {
417     case GST_EVENT_SEEK:
418     {
419       gdouble rate;
420       GstFormat format;
421       GstSeekFlags flags;
422       GstSeekType start_type, stop_type;
423       gint64 start, stop;
424       GstEvent *newsegment;
425       GSList *iter;
426
427       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
428
429       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
430           &stop_type, &stop);
431
432       if (format != GST_FORMAT_TIME)
433         return FALSE;
434
435       GST_DEBUG_OBJECT (mssdemux,
436           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
437           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
438
439       if (flags & GST_SEEK_FLAG_FLUSH) {
440         GstEvent *flush = gst_event_new_flush_start ();
441         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
442
443         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
444         gst_mss_demux_push_src_event (mssdemux, flush);
445         gst_event_unref (flush);
446       }
447
448       gst_mss_demux_stop_tasks (mssdemux, TRUE);
449
450       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
451         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
452         return FALSE;
453       }
454
455       newsegment =
456           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
457       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
458       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
459         GstMssDemuxStream *stream = iter->data;
460
461         stream->pending_newsegment = gst_event_ref (newsegment);
462       }
463       gst_event_unref (newsegment);
464
465       if (flags & GST_SEEK_FLAG_FLUSH) {
466         GstEvent *flush = gst_event_new_flush_stop ();
467         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
468
469         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
470         gst_mss_demux_push_src_event (mssdemux, flush);
471         gst_event_unref (flush);
472       }
473
474       gst_mss_demux_restart_tasks (mssdemux);
475
476       return TRUE;
477     }
478     default:
479       break;
480   }
481
482   return gst_pad_event_default (pad, event);
483 }
484
485 static gboolean
486 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
487 {
488   GstMssDemux *mssdemux;
489   gboolean ret = FALSE;
490
491   if (query == NULL)
492     return FALSE;
493
494   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
495
496   switch (query->type) {
497     case GST_QUERY_DURATION:{
498       GstClockTime duration = -1;
499       GstFormat fmt;
500
501       gst_query_parse_duration (query, &fmt, NULL);
502       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
503         /* TODO should we use the streams accumulated duration or the main manifest duration? */
504         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
505
506         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
507           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
508           ret = TRUE;
509         }
510       }
511       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
512           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
513       break;
514     }
515     case GST_QUERY_LATENCY:
516       gst_query_set_latency (query, FALSE, 0, -1);
517       ret = TRUE;
518       break;
519     case GST_QUERY_SEEKING:{
520       GstFormat fmt;
521       gint64 stop = -1;
522
523       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
524       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
525           fmt);
526       if (fmt == GST_FORMAT_TIME) {
527         GstClockTime duration;
528         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
529         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
530           stop = duration;
531         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
532         ret = TRUE;
533         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
534             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
535       }
536       break;
537     }
538     default:
539       /* Don't fordward queries upstream because of the special nature of this
540        *  "demuxer", which relies on the upstream element only to be fed
541        *  the Manifest
542        */
543       break;
544   }
545
546   return ret;
547 }
548
549 static void
550 _set_src_pad_functions (GstPad * pad)
551 {
552   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
553   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
554 }
555
556 static GstPad *
557 _create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
558 {
559   gchar *name;
560   GstPad *srcpad = NULL;
561   GstMssStreamType streamtype;
562
563   streamtype = gst_mss_stream_get_type (manifeststream);
564   GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
565       gst_mss_stream_type_name (streamtype));
566
567   /* TODO use stream's name/bitrate/index as the pad name? */
568   if (streamtype == MSS_STREAM_TYPE_VIDEO) {
569     name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
570     srcpad =
571         gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
572         name);
573     g_free (name);
574   } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
575     name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
576     srcpad =
577         gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
578         name);
579     g_free (name);
580   }
581
582   if (!srcpad) {
583     GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
584     return NULL;
585   }
586
587   _set_src_pad_functions (srcpad);
588   return srcpad;
589 }
590
591 static void
592 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
593 {
594   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
595   GSList *iter;
596
597   if (streams == NULL) {
598     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
599     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
600         (_("This file contains no playable streams.")),
601         ("no streams found at the Manifest"));
602     return;
603   }
604
605   for (iter = streams; iter; iter = g_slist_next (iter)) {
606     GstPad *srcpad = NULL;
607     GstMssDemuxStream *stream = NULL;
608     GstMssStream *manifeststream = iter->data;
609
610     srcpad = _create_pad (mssdemux, manifeststream);
611
612     if (!srcpad) {
613       continue;
614     }
615
616     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
617     gst_mss_stream_set_active (manifeststream, TRUE);
618     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
619   }
620
621   /* select initial bitrates */
622   GST_OBJECT_LOCK (mssdemux);
623   GST_INFO_OBJECT (mssdemux, "Changing max bitrate to %llu",
624       mssdemux->connection_speed);
625   gst_mss_manifest_change_bitrate (mssdemux->manifest,
626       mssdemux->connection_speed);
627   mssdemux->update_bitrates = FALSE;
628   GST_OBJECT_UNLOCK (mssdemux);
629 }
630
631 static gboolean
632 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
633 {
634   GstCaps *caps;
635   GstCaps *media_caps;
636   GstPad *pad = stream->pad;
637
638   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
639
640   if (media_caps) {
641     caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
642         "mss-fragmented", "timescale", G_TYPE_UINT64,
643         gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
644         GST_TYPE_CAPS, media_caps, NULL);
645     gst_caps_unref (media_caps);
646     gst_pad_set_caps (pad, caps);
647     gst_caps_unref (caps);
648
649     gst_pad_set_active (pad, TRUE);
650     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
651         GST_DEBUG_PAD_NAME (pad), caps);
652     gst_object_ref (pad);
653     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
654   } else {
655     GST_WARNING_OBJECT (mssdemux,
656         "Couldn't get caps from manifest stream %p %s, not exposing it", stream,
657         GST_PAD_NAME (stream->pad));
658     return FALSE;
659   }
660   return TRUE;
661 }
662
663 static void
664 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
665 {
666   GstQuery *query;
667   gchar *uri = NULL;
668   gboolean ret;
669   GSList *iter;
670
671   g_return_if_fail (mssdemux->manifest_buffer != NULL);
672   g_return_if_fail (mssdemux->manifest == NULL);
673
674   query = gst_query_new_uri ();
675   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
676   if (ret) {
677     gchar *baseurl_end;
678     gst_query_parse_uri (query, &uri);
679     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
680
681     baseurl_end = g_strrstr (uri, "/Manifest");
682     if (baseurl_end) {
683       /* set the new end of the string */
684       baseurl_end[0] = '\0';
685     } else {
686       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
687     }
688
689     mssdemux->base_url = uri;
690   }
691   gst_query_unref (query);
692
693   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
694   if (!mssdemux->manifest) {
695     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
696         ("Xml manifest file couldn't be parsed"));
697     return;
698   }
699
700   gst_mss_demux_create_streams (mssdemux);
701   for (iter = mssdemux->streams; iter;) {
702     GSList *current = iter;
703     GstMssDemuxStream *stream = iter->data;
704     iter = g_slist_next (iter); /* do it ourselves as we want it done in the beginning of the loop */
705     if (!gst_mss_demux_expose_stream (mssdemux, stream)) {
706       gst_mss_demux_stream_free (stream);
707       mssdemux->streams = g_slist_delete_link (mssdemux->streams, current);
708     }
709   }
710
711   if (!mssdemux->streams) {
712     /* no streams */
713     GST_WARNING_OBJECT (mssdemux, "Couldn't identify the caps for any of the "
714         "streams found in the manifest");
715     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
716         (_("This file contains no playable streams.")),
717         ("No known stream formats found at the Manifest"));
718     return;
719   }
720
721   gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
722 }
723
724 static void
725 gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
726 {
727   GSList *oldpads = NULL;
728   GSList *iter;
729
730   gst_mss_demux_stop_tasks (mssdemux, FALSE);
731   if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
732           mssdemux->connection_speed)) {
733
734     GST_DEBUG_OBJECT (mssdemux, "Creating new pad group");
735     /* if we changed the bitrate, we need to add new pads */
736     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
737       GstMssDemuxStream *stream = iter->data;
738       GstClockTime ts =
739           gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
740
741       oldpads = g_slist_prepend (oldpads, stream->pad);
742
743       stream->pad = _create_pad (mssdemux, stream->manifest_stream);
744       /* TODO keep the same playback rate */
745       stream->pending_newsegment =
746           gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, ts, -1, ts);
747       gst_mss_demux_expose_stream (mssdemux, stream);
748     }
749
750     gst_element_no_more_pads (GST_ELEMENT (mssdemux));
751
752     for (iter = oldpads; iter; iter = g_slist_next (iter)) {
753       GstPad *oldpad = iter->data;
754
755       /* Push out EOS */
756       gst_pad_push_event (oldpad, gst_event_new_eos ());
757       gst_pad_set_active (oldpad, FALSE);
758       gst_element_remove_pad (GST_ELEMENT (mssdemux), oldpad);
759       gst_object_unref (oldpad);
760     }
761   }
762   gst_mss_demux_restart_tasks (mssdemux);
763 }
764
765 static GstFlowReturn
766 gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
767     GstBuffer ** buffer)
768 {
769   GstMssDemux *mssdemux = stream->parent;
770   gchar *path;
771   gchar *url;
772   GstFragment *fragment;
773   GstBuffer *_buffer;
774   GstFlowReturn ret = GST_FLOW_OK;
775
776   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
777   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
778   switch (ret) {
779     case GST_FLOW_OK:
780       break;                    /* all is good, let's go */
781     case GST_FLOW_UNEXPECTED:  /* EOS */
782       return GST_FLOW_UNEXPECTED;
783     case GST_FLOW_ERROR:
784       goto error;
785     default:
786       break;
787   }
788   if (!path) {
789     goto no_url_error;
790   }
791   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
792
793   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
794
795   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
796   g_free (path);
797   g_free (url);
798
799   if (!fragment) {
800     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
801     /* TODO check if we are truly stoping */
802     return GST_FLOW_ERROR;
803   }
804
805   _buffer = gst_fragment_get_buffer (fragment);
806   _buffer = gst_buffer_make_metadata_writable (_buffer);
807   gst_buffer_set_caps (_buffer, GST_PAD_CAPS (stream->pad));
808   GST_BUFFER_TIMESTAMP (_buffer) =
809       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
810   GST_BUFFER_DURATION (_buffer) =
811       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
812
813   *buffer = _buffer;
814   return ret;
815
816 no_url_error:
817   {
818     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
819         (_("Failed to get fragment URL.")),
820         ("An error happened when getting fragment URL"));
821     gst_task_stop (stream->stream_task);
822     return GST_FLOW_ERROR;
823   }
824 error:
825   {
826     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
827     gst_task_stop (stream->stream_task);
828     return GST_FLOW_ERROR;
829   }
830 }
831
832 static void
833 gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
834 {
835   GstMssDemux *mssdemux = stream->parent;
836   GstBuffer *buffer = NULL;
837   GstFlowReturn ret;
838
839   GST_OBJECT_LOCK (mssdemux);
840   if (mssdemux->update_bitrates) {
841     mssdemux->update_bitrates = FALSE;
842     GST_OBJECT_UNLOCK (mssdemux);
843
844     GST_DEBUG_OBJECT (mssdemux,
845         "Starting streams reconfiguration due to bitrate changes");
846     g_thread_create ((GThreadFunc) gst_mss_demux_reconfigure, mssdemux, FALSE,
847         NULL);
848     GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
849   } else {
850     GST_OBJECT_UNLOCK (mssdemux);
851   }
852
853   ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
854   switch (ret) {
855     case GST_FLOW_OK:
856       break;                    /* all is good, let's go */
857     case GST_FLOW_UNEXPECTED:  /* EOS */
858       goto eos;
859     case GST_FLOW_ERROR:
860       goto error;
861     default:
862       break;
863   }
864
865   g_assert (buffer != NULL);
866
867   if (G_UNLIKELY (stream->pending_newsegment)) {
868     gst_pad_push_event (stream->pad, stream->pending_newsegment);
869     stream->pending_newsegment = NULL;
870   }
871
872   GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
873       GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
874   ret = gst_pad_push (stream->pad, buffer);
875   switch (ret) {
876     case GST_FLOW_UNEXPECTED:
877       goto eos;                 /* EOS ? */
878     case GST_FLOW_ERROR:
879       goto error;
880     case GST_FLOW_NOT_LINKED:
881       break;                    /* TODO what to do here? pause the task or just keep pushing? */
882     case GST_FLOW_OK:
883     default:
884       break;
885   }
886
887   gst_mss_stream_advance_fragment (stream->manifest_stream);
888   return;
889
890 eos:
891   {
892     GstEvent *eos = gst_event_new_eos ();
893     GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
894         GST_DEBUG_PAD_NAME (stream->pad));
895     gst_pad_push_event (stream->pad, eos);
896     gst_task_stop (stream->stream_task);
897     return;
898   }
899 error:
900   {
901     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
902     gst_task_stop (stream->stream_task);
903     return;
904   }
905 }