e05ef53914aa926926fe8e30cf069927268ee500
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  * TODO
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include "gst/gst-i18n-plugin.h"
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "gstmssdemux.h"
42
43 GST_DEBUG_CATEGORY (mssdemux_debug);
44
45 #define DEFAULT_CONNECTION_SPEED 0
46
47 enum
48 {
49   PROP_0,
50
51   PROP_CONNECTION_SPEED,
52   PROP_LAST
53 };
54
55 static GstStaticPadTemplate gst_mss_demux_sink_template =
56 GST_STATIC_PAD_TEMPLATE ("sink",
57     GST_PAD_SINK,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
60     );
61
62 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
63 GST_STATIC_PAD_TEMPLATE ("video_%02u",
64     GST_PAD_SRC,
65     GST_PAD_SOMETIMES,
66     GST_STATIC_CAPS_ANY);
67
68 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
69 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
70     GST_PAD_SRC,
71     GST_PAD_SOMETIMES,
72     GST_STATIC_CAPS_ANY);
73
74 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
75
76 static void gst_mss_demux_dispose (GObject * object);
77 static void gst_mss_demux_set_property (GObject * object, guint prop_id,
78     const GValue * value, GParamSpec * pspec);
79 static void gst_mss_demux_get_property (GObject * object, guint prop_id,
80     GValue * value, GParamSpec * pspec);
81 static GstStateChangeReturn gst_mss_demux_change_state (GstElement * element,
82     GstStateChange transition);
83 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
84 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
85
86 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
87
88 static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
89 static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
90
91 static gboolean gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
92
93 static void
94 gst_mss_demux_base_init (gpointer klass)
95 {
96   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
97
98   gst_element_class_add_static_pad_template (element_class,
99       &gst_mss_demux_sink_template);
100   gst_element_class_add_static_pad_template (element_class,
101       &gst_mss_demux_videosrc_template);
102   gst_element_class_add_static_pad_template (element_class,
103       &gst_mss_demux_audiosrc_template);
104   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
105       "demuxer", "Demuxer",
106       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
107       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
108
109   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
110 }
111
112 static void
113 gst_mss_demux_class_init (GstMssDemuxClass * klass)
114 {
115   GObjectClass *gobject_class;
116   GstElementClass *gstelement_class;
117
118   gobject_class = (GObjectClass *) klass;
119   gstelement_class = (GstElementClass *) klass;
120
121   parent_class = g_type_class_peek_parent (klass);
122
123   gobject_class->dispose = gst_mss_demux_dispose;
124   gobject_class->set_property = gst_mss_demux_set_property;
125   gobject_class->get_property = gst_mss_demux_get_property;
126
127   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
128       g_param_spec_uint ("connection-speed", "Connection Speed",
129           "Network connection speed in kbps (0 = unknown)",
130           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
131           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132
133   gstelement_class->change_state =
134       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
135 }
136
137 static void
138 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
139 {
140   mssdemux->sinkpad =
141       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
142   gst_pad_set_chain_function (mssdemux->sinkpad,
143       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
144   gst_pad_set_event_function (mssdemux->sinkpad,
145       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
146   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
147
148   g_static_rec_mutex_init (&mssdemux->stream_lock);
149   mssdemux->stream_task =
150       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
151   gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
152 }
153
154 static gboolean
155 _data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
156     guint64 time, gpointer checkdata)
157 {
158   GstMssDemuxStream *stream = checkdata;
159   GstMssDemux *mssdemux = stream->parent;
160
161   if (mssdemux->data_queue_max_size == 0)
162     return FALSE;               /* never full */
163   return visible >= mssdemux->data_queue_max_size;
164 }
165
166 static GstMssDemuxStream *
167 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
168     GstMssStream * manifeststream, GstPad * srcpad)
169 {
170   GstMssDemuxStream *stream;
171
172   stream = g_new0 (GstMssDemuxStream, 1);
173   stream->downloader = gst_uri_downloader_new ();
174   stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
175
176   /* Downloading task */
177   g_static_rec_mutex_init (&stream->download_lock);
178   stream->download_task =
179       gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
180   gst_task_set_lock (stream->download_task, &stream->download_lock);
181
182   stream->pad = srcpad;
183   stream->manifest_stream = manifeststream;
184   stream->parent = mssdemux;
185
186   return stream;
187 }
188
189 static void
190 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
191 {
192   if (stream->download_task) {
193     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
194       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
195           GST_DEBUG_PAD_NAME (stream->pad));
196       gst_task_stop (stream->download_task);
197       gst_uri_downloader_cancel (stream->downloader);
198       g_static_rec_mutex_lock (&stream->download_lock);
199       g_static_rec_mutex_unlock (&stream->download_lock);
200       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
201       gst_task_join (stream->download_task);
202       GST_LOG_OBJECT (stream->parent, "Finished");
203     }
204     gst_object_unref (stream->download_task);
205     g_static_rec_mutex_free (&stream->download_lock);
206     stream->download_task = NULL;
207   }
208
209   if (stream->pending_newsegment) {
210     gst_event_unref (stream->pending_newsegment);
211     stream->pending_newsegment = NULL;
212   }
213
214
215   if (stream->downloader != NULL) {
216     g_object_unref (stream->downloader);
217     stream->downloader = NULL;
218   }
219   if (stream->dataqueue) {
220     g_object_unref (stream->dataqueue);
221     stream->dataqueue = NULL;
222   }
223   if (stream->pad) {
224     gst_object_unref (stream->pad);
225     stream->pad = NULL;
226   }
227   g_free (stream);
228 }
229
230 static void
231 gst_mss_demux_reset (GstMssDemux * mssdemux)
232 {
233   GSList *iter;
234
235   if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
236     gst_task_stop (mssdemux->stream_task);
237     g_static_rec_mutex_lock (&mssdemux->stream_lock);
238     g_static_rec_mutex_unlock (&mssdemux->stream_lock);
239     gst_task_join (mssdemux->stream_task);
240   }
241
242   if (mssdemux->manifest_buffer) {
243     gst_buffer_unref (mssdemux->manifest_buffer);
244     mssdemux->manifest_buffer = NULL;
245   }
246
247   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
248     GstMssDemuxStream *stream = iter->data;
249     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
250     gst_mss_demux_stream_free (stream);
251   }
252   g_slist_free (mssdemux->streams);
253   mssdemux->streams = NULL;
254
255   if (mssdemux->manifest) {
256     gst_mss_manifest_free (mssdemux->manifest);
257     mssdemux->manifest = NULL;
258   }
259
260   mssdemux->n_videos = mssdemux->n_audios = 0;
261   g_free (mssdemux->base_url);
262   g_free (mssdemux->manifest_uri);
263   mssdemux->base_url = NULL;
264 }
265
266 static void
267 gst_mss_demux_dispose (GObject * object)
268 {
269   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
270
271   if (mssdemux->stream_task) {
272     gst_object_unref (mssdemux->stream_task);
273     g_static_rec_mutex_free (&mssdemux->stream_lock);
274     mssdemux->stream_task = NULL;
275   }
276
277   G_OBJECT_CLASS (parent_class)->dispose (object);
278 }
279
280 static void
281 gst_mss_demux_set_property (GObject * object, guint prop_id,
282     const GValue * value, GParamSpec * pspec)
283 {
284   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
285
286   switch (prop_id) {
287     case PROP_CONNECTION_SPEED:
288       GST_OBJECT_LOCK (mssdemux);
289       mssdemux->connection_speed = g_value_get_uint (value) * 1000;
290       mssdemux->update_bitrates = TRUE;
291       GST_DEBUG_OBJECT (mssdemux, "Connection speed set to %llu",
292           mssdemux->connection_speed);
293       GST_OBJECT_UNLOCK (mssdemux);
294       break;
295     default:
296       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
297       break;
298   }
299 }
300
301 static void
302 gst_mss_demux_get_property (GObject * object, guint prop_id, GValue * value,
303     GParamSpec * pspec)
304 {
305   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
306
307   switch (prop_id) {
308     case PROP_CONNECTION_SPEED:
309       g_value_set_uint (value, mssdemux->connection_speed / 1000);
310       break;
311     default:
312       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
313       break;
314   }
315 }
316
317 static GstStateChangeReturn
318 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
319 {
320   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
321   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
322
323   switch (transition) {
324     case GST_STATE_CHANGE_PAUSED_TO_READY:
325       gst_mss_demux_reset (mssdemux);
326       break;
327     case GST_STATE_CHANGE_READY_TO_NULL:
328       break;
329     default:
330       break;
331   }
332
333   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
334
335   switch (transition) {
336     case GST_STATE_CHANGE_PAUSED_TO_READY:{
337       break;
338     }
339     default:
340       break;
341   }
342
343   return result;
344 }
345
346 static GstFlowReturn
347 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
348 {
349   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
350   if (mssdemux->manifest_buffer == NULL)
351     mssdemux->manifest_buffer = buffer;
352   else
353     mssdemux->manifest_buffer =
354         gst_buffer_join (mssdemux->manifest_buffer, buffer);
355
356   return GST_FLOW_OK;
357 }
358
359 static void
360 gst_mss_demux_start (GstMssDemux * mssdemux)
361 {
362   GSList *iter;
363
364   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
365   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
366     GstMssDemuxStream *stream = iter->data;
367     gst_task_start (stream->download_task);
368   }
369
370   gst_task_start (mssdemux->stream_task);
371 }
372
373 static gboolean
374 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
375 {
376   GSList *iter;
377   gboolean ret = TRUE;
378
379   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
380     GstMssDemuxStream *stream = iter->data;
381     gst_event_ref (event);
382     ret = ret & gst_pad_push_event (stream->pad, event);
383   }
384   return ret;
385 }
386
387 static gboolean
388 gst_mss_demux_event (GstPad * pad, GstEvent * event)
389 {
390   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
391   gboolean forward = TRUE;
392   gboolean ret = TRUE;
393
394   switch (GST_EVENT_TYPE (event)) {
395     case GST_EVENT_EOS:
396       if (mssdemux->manifest_buffer == NULL) {
397         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
398         break;
399       }
400
401       if (gst_mss_demux_process_manifest (mssdemux))
402         gst_mss_demux_start (mssdemux);
403       forward = FALSE;
404       break;
405     default:
406       break;
407   }
408
409   if (forward) {
410     ret = gst_pad_event_default (pad, event);
411   } else {
412     gst_event_unref (event);
413   }
414
415   return ret;
416 }
417
418 static void
419 gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
420 {
421   GSList *iter;
422
423   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
424     GstMssDemuxStream *stream = iter->data;
425
426     gst_data_queue_set_flushing (stream->dataqueue, TRUE);
427
428     if (immediate)
429       gst_uri_downloader_cancel (stream->downloader);
430     gst_task_pause (stream->download_task);
431   }
432   gst_task_pause (mssdemux->stream_task);
433
434   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
435     GstMssDemuxStream *stream = iter->data;
436     g_static_rec_mutex_lock (&stream->download_lock);
437   }
438   g_static_rec_mutex_lock (&mssdemux->stream_lock);
439 }
440
441 static void
442 gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
443 {
444   GSList *iter;
445   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
446     GstMssDemuxStream *stream = iter->data;
447     g_static_rec_mutex_unlock (&stream->download_lock);
448   }
449   g_static_rec_mutex_unlock (&mssdemux->stream_lock);
450   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
451     GstMssDemuxStream *stream = iter->data;
452
453     gst_data_queue_set_flushing (stream->dataqueue, FALSE);
454     gst_task_start (stream->download_task);
455   }
456   gst_task_start (mssdemux->stream_task);
457 }
458
459 static gboolean
460 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
461 {
462   GstMssDemux *mssdemux;
463
464   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
465
466   switch (event->type) {
467     case GST_EVENT_SEEK:
468     {
469       gdouble rate;
470       GstFormat format;
471       GstSeekFlags flags;
472       GstSeekType start_type, stop_type;
473       gint64 start, stop;
474       GstEvent *newsegment;
475       GSList *iter;
476
477       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
478
479       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
480           &stop_type, &stop);
481
482       if (format != GST_FORMAT_TIME)
483         return FALSE;
484
485       GST_DEBUG_OBJECT (mssdemux,
486           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
487           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
488
489       if (flags & GST_SEEK_FLAG_FLUSH) {
490         GstEvent *flush = gst_event_new_flush_start ();
491         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
492
493         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
494         gst_mss_demux_push_src_event (mssdemux, flush);
495         gst_event_unref (flush);
496       }
497
498       gst_mss_demux_stop_tasks (mssdemux, TRUE);
499
500       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
501         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
502         return FALSE;
503       }
504
505       newsegment =
506           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
507       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
508       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
509         GstMssDemuxStream *stream = iter->data;
510
511         stream->eos = FALSE;
512         gst_data_queue_flush (stream->dataqueue);
513         stream->pending_newsegment = gst_event_ref (newsegment);
514       }
515       gst_event_unref (newsegment);
516
517       if (flags & GST_SEEK_FLAG_FLUSH) {
518         GstEvent *flush = gst_event_new_flush_stop ();
519         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
520
521         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
522         gst_mss_demux_push_src_event (mssdemux, flush);
523         gst_event_unref (flush);
524       }
525
526       gst_mss_demux_restart_tasks (mssdemux);
527
528       return TRUE;
529     }
530     default:
531       break;
532   }
533
534   return gst_pad_event_default (pad, event);
535 }
536
537 static gboolean
538 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
539 {
540   GstMssDemux *mssdemux;
541   gboolean ret = FALSE;
542
543   if (query == NULL)
544     return FALSE;
545
546   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
547
548   switch (query->type) {
549     case GST_QUERY_DURATION:{
550       GstClockTime duration = -1;
551       GstFormat fmt;
552
553       gst_query_parse_duration (query, &fmt, NULL);
554       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
555         /* TODO should we use the streams accumulated duration or the main manifest duration? */
556         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
557
558         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
559           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
560           ret = TRUE;
561         }
562       }
563       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
564           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
565       break;
566     }
567     case GST_QUERY_LATENCY:{
568       gboolean live = FALSE;
569
570       live = mssdemux->manifest
571           && gst_mss_manifest_is_live (mssdemux->manifest);
572
573       gst_query_set_latency (query, live, 0, -1);
574       ret = TRUE;
575       break;
576     }
577     case GST_QUERY_SEEKING:{
578       GstFormat fmt;
579       gint64 stop = -1;
580
581       if (mssdemux->manifest && gst_mss_manifest_is_live (mssdemux->manifest)) {
582         return FALSE;           /* no live seeking */
583       }
584
585       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
586       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
587           fmt);
588       if (fmt == GST_FORMAT_TIME) {
589         GstClockTime duration;
590         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
591         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
592           stop = duration;
593         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
594         ret = TRUE;
595         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
596             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
597       }
598       break;
599     }
600     default:
601       /* Don't fordward queries upstream because of the special nature of this
602        *  "demuxer", which relies on the upstream element only to be fed
603        *  the Manifest
604        */
605       break;
606   }
607
608   return ret;
609 }
610
611 static void
612 _set_src_pad_functions (GstPad * pad)
613 {
614   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
615   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
616 }
617
618 static GstPad *
619 _create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
620 {
621   gchar *name;
622   GstPad *srcpad = NULL;
623   GstMssStreamType streamtype;
624
625   streamtype = gst_mss_stream_get_type (manifeststream);
626   GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
627       gst_mss_stream_type_name (streamtype));
628
629   /* TODO use stream's name/bitrate/index as the pad name? */
630   if (streamtype == MSS_STREAM_TYPE_VIDEO) {
631     name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
632     srcpad =
633         gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
634         name);
635     g_free (name);
636   } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
637     name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
638     srcpad =
639         gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
640         name);
641     g_free (name);
642   }
643
644   if (!srcpad) {
645     GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
646     return NULL;
647   }
648
649   _set_src_pad_functions (srcpad);
650   return srcpad;
651 }
652
653 static void
654 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
655 {
656   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
657   GSList *iter;
658
659   if (streams == NULL) {
660     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
661     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
662         (_("This file contains no playable streams.")),
663         ("no streams found at the Manifest"));
664     return;
665   }
666
667   for (iter = streams; iter; iter = g_slist_next (iter)) {
668     GstPad *srcpad = NULL;
669     GstMssDemuxStream *stream = NULL;
670     GstMssStream *manifeststream = iter->data;
671
672     srcpad = _create_pad (mssdemux, manifeststream);
673
674     if (!srcpad) {
675       continue;
676     }
677
678     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
679     gst_mss_stream_set_active (manifeststream, TRUE);
680     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
681   }
682
683   /* select initial bitrates */
684   GST_OBJECT_LOCK (mssdemux);
685   GST_INFO_OBJECT (mssdemux, "Changing max bitrate to %llu",
686       mssdemux->connection_speed);
687   gst_mss_manifest_change_bitrate (mssdemux->manifest,
688       mssdemux->connection_speed);
689   mssdemux->update_bitrates = FALSE;
690   GST_OBJECT_UNLOCK (mssdemux);
691 }
692
693 static gboolean
694 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
695 {
696   GstCaps *caps;
697   GstCaps *media_caps;
698   GstPad *pad = stream->pad;
699
700   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
701
702   if (media_caps) {
703     caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
704         "mss-fragmented", "timescale", G_TYPE_UINT64,
705         gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
706         GST_TYPE_CAPS, media_caps, NULL);
707     gst_caps_unref (media_caps);
708     gst_pad_set_caps (pad, caps);
709     gst_caps_unref (caps);
710
711     gst_pad_set_active (pad, TRUE);
712     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
713         GST_DEBUG_PAD_NAME (pad), caps);
714     gst_object_ref (pad);
715     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
716   } else {
717     GST_WARNING_OBJECT (mssdemux,
718         "Couldn't get caps from manifest stream %p %s, not exposing it", stream,
719         GST_PAD_NAME (stream->pad));
720     return FALSE;
721   }
722   return TRUE;
723 }
724
725 static gboolean
726 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
727 {
728   GstQuery *query;
729   gchar *uri = NULL;
730   gboolean ret;
731   GSList *iter;
732
733   g_return_val_if_fail (mssdemux->manifest_buffer != NULL, FALSE);
734   g_return_val_if_fail (mssdemux->manifest == NULL, FALSE);
735
736   query = gst_query_new_uri ();
737   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
738   if (ret) {
739     gchar *baseurl_end;
740     gst_query_parse_uri (query, &uri);
741     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
742
743     mssdemux->manifest_uri = g_strdup (uri);
744     baseurl_end = g_strrstr (uri, "/Manifest");
745     if (baseurl_end) {
746       /* set the new end of the string */
747       baseurl_end[0] = '\0';
748     } else {
749       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
750     }
751
752     mssdemux->base_url = uri;
753   }
754   gst_query_unref (query);
755
756   if (mssdemux->base_url == NULL) {
757     GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
758         (_("Couldn't get the Manifest's URI")),
759         ("need to get the manifest's URI from upstream elements"));
760     return FALSE;
761   }
762
763   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
764   if (!mssdemux->manifest) {
765     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
766         ("Xml manifest file couldn't be parsed"));
767     return FALSE;
768   }
769
770   GST_INFO_OBJECT (mssdemux, "Live stream: %d",
771       gst_mss_manifest_is_live (mssdemux->manifest));
772
773   gst_mss_demux_create_streams (mssdemux);
774   for (iter = mssdemux->streams; iter;) {
775     GSList *current = iter;
776     GstMssDemuxStream *stream = iter->data;
777     iter = g_slist_next (iter); /* do it ourselves as we want it done in the beginning of the loop */
778     if (!gst_mss_demux_expose_stream (mssdemux, stream)) {
779       gst_mss_demux_stream_free (stream);
780       mssdemux->streams = g_slist_delete_link (mssdemux->streams, current);
781     }
782   }
783
784   if (!mssdemux->streams) {
785     /* no streams */
786     GST_WARNING_OBJECT (mssdemux, "Couldn't identify the caps for any of the "
787         "streams found in the manifest");
788     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
789         (_("This file contains no playable streams.")),
790         ("No known stream formats found at the Manifest"));
791     return FALSE;
792   }
793
794   gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
795   return TRUE;
796 }
797
798 static void
799 gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
800 {
801   GstUriDownloader *downloader;
802   GstFragment *manifest_data;
803   GstBuffer *manifest_buffer;
804
805   downloader = gst_uri_downloader_new ();
806
807   manifest_data =
808       gst_uri_downloader_fetch_uri (downloader, mssdemux->manifest_uri);
809   manifest_buffer = gst_fragment_get_buffer (manifest_data);
810   g_object_unref (manifest_data);
811
812   gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer);
813   gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer);
814   gst_buffer_unref (manifest_buffer);
815
816   g_object_unref (downloader);
817 }
818
819 static void
820 gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
821 {
822   GSList *oldpads = NULL;
823   GSList *iter;
824
825   gst_mss_demux_stop_tasks (mssdemux, TRUE);
826   if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
827           mssdemux->connection_speed)) {
828     GstClockTime newseg_ts = GST_CLOCK_TIME_NONE;
829
830     GST_DEBUG_OBJECT (mssdemux, "Creating new pad group");
831     /* if we changed the bitrate, we need to add new pads */
832     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
833       GstMssDemuxStream *stream = iter->data;
834       GstPad *oldpad = stream->pad;
835       GstClockTime ts = GST_CLOCK_TIME_NONE;
836
837       oldpads = g_slist_prepend (oldpads, oldpad);
838
839       /* since we are flushing the queue, get the next un-pushed timestamp to seek
840        * and avoid gaps */
841       gst_data_queue_set_flushing (stream->dataqueue, FALSE);
842       if (!gst_data_queue_is_empty (stream->dataqueue)) {
843         GstDataQueueItem *item = NULL;
844
845         while (!gst_data_queue_is_empty (stream->dataqueue)
846             && !GST_CLOCK_TIME_IS_VALID (ts)) {
847           gst_data_queue_pop (stream->dataqueue, &item);
848
849           if (!item) {
850             g_assert_not_reached ();
851             break;
852           }
853
854           if (GST_IS_BUFFER (item->object)) {
855             GstBuffer *buffer = GST_BUFFER_CAST (item->object);
856
857             ts = GST_BUFFER_TIMESTAMP (buffer);
858           }
859           item->destroy (item);
860         }
861
862       }
863       if (!GST_CLOCK_TIME_IS_VALID (ts)) {
864         ts = gst_mss_stream_get_fragment_gst_timestamp
865             (stream->manifest_stream);
866       }
867
868       if (ts < newseg_ts)
869         newseg_ts = ts;
870
871       GST_DEBUG_OBJECT (mssdemux,
872           "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
873           GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
874       gst_mss_stream_seek (stream->manifest_stream, ts);
875       gst_data_queue_flush (stream->dataqueue);
876
877       stream->pad = _create_pad (mssdemux, stream->manifest_stream);
878       gst_mss_demux_expose_stream (mssdemux, stream);
879
880       gst_pad_push_event (oldpad, gst_event_new_eos ());
881     }
882
883     gst_element_no_more_pads (GST_ELEMENT (mssdemux));
884
885     for (iter = oldpads; iter; iter = g_slist_next (iter)) {
886       GstPad *oldpad = iter->data;
887
888       gst_pad_set_active (oldpad, FALSE);
889       gst_element_remove_pad (GST_ELEMENT (mssdemux), oldpad);
890       gst_object_unref (oldpad);
891     }
892     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
893       GstMssDemuxStream *stream = iter->data;
894
895       stream->pending_newsegment =
896           gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, newseg_ts, -1,
897           newseg_ts);
898     }
899   }
900   gst_mss_demux_restart_tasks (mssdemux);
901 }
902
903 static void
904 _free_data_queue_item (gpointer obj)
905 {
906   GstDataQueueItem *item = obj;
907
908   gst_mini_object_unref (item->object);
909   g_slice_free (GstDataQueueItem, item);
910 }
911
912 static void
913 gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
914     GstMiniObject * obj)
915 {
916   GstDataQueueItem *item;
917
918   item = g_slice_new (GstDataQueueItem);
919   item->object = (GstMiniObject *) obj;
920
921   item->duration = 0;           /* we don't care */
922   item->size = 0;
923   item->visible = TRUE;
924
925   item->destroy = (GDestroyNotify) _free_data_queue_item;
926
927   if (!gst_data_queue_push (stream->dataqueue, item)) {
928     GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
929     gst_mini_object_unref (obj);
930     g_slice_free (GstDataQueueItem, item);
931   }
932 }
933
934 static GstFlowReturn
935 gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
936     GstBuffer ** buffer)
937 {
938   GstMssDemux *mssdemux = stream->parent;
939   gchar *path;
940   gchar *url;
941   GstFragment *fragment;
942   GstBuffer *_buffer;
943   GstFlowReturn ret = GST_FLOW_OK;
944
945   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
946   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
947   switch (ret) {
948     case GST_FLOW_OK:
949       break;                    /* all is good, let's go */
950     case GST_FLOW_UNEXPECTED:  /* EOS */
951       gst_mss_demux_reload_manifest (mssdemux);
952       return GST_FLOW_OK;
953       return GST_FLOW_UNEXPECTED;
954     case GST_FLOW_ERROR:
955       goto error;
956     default:
957       break;
958   }
959   if (!path) {
960     goto no_url_error;
961   }
962   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
963
964   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
965
966   GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
967
968   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
969   g_free (path);
970   g_free (url);
971
972   if (!fragment) {
973     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
974     /* TODO check if we are truly stoping */
975     if (gst_mss_manifest_is_live (mssdemux->manifest)) {
976       /* looks like there is no way of knowing when a live stream has ended
977        * Have to assume we are falling behind and cause a manifest reload */
978       return GST_FLOW_OK;
979     }
980     return GST_FLOW_ERROR;
981   }
982
983   _buffer = gst_fragment_get_buffer (fragment);
984   _buffer = gst_buffer_make_metadata_writable (_buffer);
985   gst_buffer_set_caps (_buffer, GST_PAD_CAPS (stream->pad));
986   GST_BUFFER_TIMESTAMP (_buffer) =
987       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
988   GST_BUFFER_DURATION (_buffer) =
989       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
990
991   g_object_unref (fragment);
992
993   if (buffer)
994     *buffer = _buffer;
995
996   if (_buffer) {
997     GST_DEBUG_OBJECT (mssdemux,
998         "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
999         " Duration: %" GST_TIME_FORMAT,
1000         stream, GST_PAD_NAME (stream->pad),
1001         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
1002         GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
1003     gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
1004   }
1005
1006   return ret;
1007
1008 no_url_error:
1009   {
1010     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
1011         (_("Failed to get fragment URL.")),
1012         ("An error happened when getting fragment URL"));
1013     gst_task_stop (stream->download_task);
1014     return GST_FLOW_ERROR;
1015   }
1016 error:
1017   {
1018     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1019     gst_task_stop (stream->download_task);
1020     return GST_FLOW_ERROR;
1021   }
1022 }
1023
1024 static void
1025 gst_mss_demux_download_loop (GstMssDemuxStream * stream)
1026 {
1027   GstMssDemux *mssdemux = stream->parent;
1028   GstBuffer *buffer = NULL;
1029   GstFlowReturn ret;
1030
1031   GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
1032
1033
1034   ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
1035   switch (ret) {
1036     case GST_FLOW_OK:
1037       break;                    /* all is good, let's go */
1038     case GST_FLOW_UNEXPECTED:  /* EOS */
1039       goto eos;
1040     case GST_FLOW_ERROR:
1041       goto error;
1042     default:
1043       break;
1044   }
1045
1046   if (buffer) {
1047     gst_mss_stream_advance_fragment (stream->manifest_stream);
1048   }
1049   GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
1050   return;
1051
1052 eos:
1053   {
1054     GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
1055         GST_DEBUG_PAD_NAME (stream->pad));
1056     gst_mss_demux_stream_store_object (stream,
1057         GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
1058     gst_task_stop (stream->download_task);
1059     return;
1060   }
1061 error:
1062   {
1063     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1064     gst_task_stop (stream->download_task);
1065     return;
1066   }
1067 }
1068
1069 static GstFlowReturn
1070 gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
1071     GstMssDemuxStream ** stream)
1072 {
1073   GstFlowReturn ret = GST_FLOW_OK;
1074   GstMssDemuxStream *current = NULL;
1075   GstClockTime cur_time = GST_CLOCK_TIME_NONE;
1076   GSList *iter;
1077
1078   if (!mssdemux->streams)
1079     return GST_FLOW_ERROR;
1080
1081   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
1082     GstClockTime time;
1083     GstMssDemuxStream *other;
1084     GstDataQueueItem *item;
1085
1086     other = iter->data;
1087     if (other->eos) {
1088       continue;
1089     }
1090
1091     if (gst_data_queue_peek (other->dataqueue, &item)) {
1092     } else {
1093       /* flushing */
1094       return GST_FLOW_WRONG_STATE;
1095     }
1096
1097     if (GST_IS_EVENT (item->object)) {
1098       /* events have higher priority */
1099       current = other;
1100       break;
1101     }
1102     time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
1103     if (time < cur_time) {
1104       cur_time = time;
1105       current = other;
1106     }
1107   }
1108
1109   *stream = current;
1110   if (current == NULL)
1111     ret = GST_FLOW_UNEXPECTED;
1112   return ret;
1113 }
1114
1115 static void
1116 gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
1117 {
1118   GstMssDemuxStream *stream = NULL;
1119   GstFlowReturn ret;
1120   GstMiniObject *object = NULL;
1121   GstDataQueueItem *item = NULL;
1122
1123   GST_LOG_OBJECT (mssdemux, "Starting stream loop");
1124
1125   GST_OBJECT_LOCK (mssdemux);
1126   if (mssdemux->update_bitrates) {
1127     mssdemux->update_bitrates = FALSE;
1128     GST_OBJECT_UNLOCK (mssdemux);
1129
1130     GST_DEBUG_OBJECT (mssdemux,
1131         "Starting streams reconfiguration due to bitrate changes");
1132     gst_mss_demux_reconfigure (mssdemux);
1133     GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
1134   } else {
1135     GST_OBJECT_UNLOCK (mssdemux);
1136   }
1137
1138   ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
1139
1140   if (stream)
1141     GST_DEBUG_OBJECT (mssdemux,
1142         "Stream loop selected %p stream of pad %s. %d - %s", stream,
1143         GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
1144   else
1145     GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
1146         gst_flow_get_name (ret));
1147
1148   switch (ret) {
1149     case GST_FLOW_OK:
1150       break;
1151     case GST_FLOW_ERROR:
1152       goto error;
1153     case GST_FLOW_UNEXPECTED:
1154       goto eos;
1155     case GST_FLOW_WRONG_STATE:
1156       GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
1157       goto stop;
1158     default:
1159       g_assert_not_reached ();
1160   }
1161
1162   GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
1163       stream, GST_PAD_NAME (stream->pad));
1164   if (gst_data_queue_pop (stream->dataqueue, &item)) {
1165     if (item->object)
1166       object = gst_mini_object_ref (item->object);
1167     item->destroy (item);
1168   } else {
1169     GST_DEBUG_OBJECT (mssdemux,
1170         "Failed to get object from dataqueue on stream %p %s", stream,
1171         GST_PAD_NAME (stream->pad));
1172     goto stop;
1173   }
1174
1175   if (G_UNLIKELY (stream->pending_newsegment)) {
1176     gst_pad_push_event (stream->pad, stream->pending_newsegment);
1177     stream->pending_newsegment = NULL;
1178   }
1179
1180   if (G_LIKELY (GST_IS_BUFFER (object))) {
1181     if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
1182       GST_ERROR_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
1183           GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
1184           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1185           GST_TIME_ARGS (stream->next_timestamp));
1186       GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
1187     }
1188
1189     GST_DEBUG_OBJECT (mssdemux,
1190         "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
1191         " discont:%d on pad %s", object,
1192         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1193         GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
1194         GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
1195         GST_PAD_NAME (stream->pad));
1196
1197     stream->next_timestamp =
1198         GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
1199
1200     ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
1201   } else if (GST_IS_EVENT (object)) {
1202     if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
1203       stream->eos = TRUE;
1204     GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
1205         GST_PAD_NAME (stream->pad));
1206     gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
1207   } else {
1208     g_return_if_reached ();
1209   }
1210
1211   switch (ret) {
1212     case GST_FLOW_UNEXPECTED:
1213       goto eos;                 /* EOS ? */
1214     case GST_FLOW_ERROR:
1215       goto error;
1216     case GST_FLOW_NOT_LINKED:
1217       break;                    /* TODO what to do here? pause the task or just keep pushing? */
1218     case GST_FLOW_OK:
1219     default:
1220       break;
1221   }
1222
1223   GST_LOG_OBJECT (mssdemux, "Stream loop end");
1224   return;
1225
1226 eos:
1227   {
1228     GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
1229     gst_task_stop (mssdemux->stream_task);
1230     return;
1231   }
1232 error:
1233   {
1234     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1235     gst_task_stop (mssdemux->stream_task);
1236     return;
1237   }
1238 stop:
1239   {
1240     GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task");
1241     gst_task_stop (mssdemux->stream_task);
1242     return;
1243   }
1244 }