mssdemux: implement live streams handling
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  * TODO
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include "gst/gst-i18n-plugin.h"
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "gstmssdemux.h"
42
43 GST_DEBUG_CATEGORY (mssdemux_debug);
44
45 #define DEFAULT_CONNECTION_SPEED 0
46
47 enum
48 {
49   PROP_0,
50
51   PROP_CONNECTION_SPEED,
52   PROP_LAST
53 };
54
55 static GstStaticPadTemplate gst_mss_demux_sink_template =
56 GST_STATIC_PAD_TEMPLATE ("sink",
57     GST_PAD_SINK,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
60     );
61
62 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
63 GST_STATIC_PAD_TEMPLATE ("video_%02u",
64     GST_PAD_SRC,
65     GST_PAD_SOMETIMES,
66     GST_STATIC_CAPS_ANY);
67
68 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
69 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
70     GST_PAD_SRC,
71     GST_PAD_SOMETIMES,
72     GST_STATIC_CAPS_ANY);
73
74 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
75
76 static void gst_mss_demux_dispose (GObject * object);
77 static void gst_mss_demux_set_property (GObject * object, guint prop_id,
78     const GValue * value, GParamSpec * pspec);
79 static void gst_mss_demux_get_property (GObject * object, guint prop_id,
80     GValue * value, GParamSpec * pspec);
81 static GstStateChangeReturn gst_mss_demux_change_state (GstElement * element,
82     GstStateChange transition);
83 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
84 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
85
86 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
87
88 static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
89 static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
90
91 static gboolean gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
92
93 static void
94 gst_mss_demux_base_init (gpointer klass)
95 {
96   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
97
98   gst_element_class_add_static_pad_template (element_class,
99       &gst_mss_demux_sink_template);
100   gst_element_class_add_static_pad_template (element_class,
101       &gst_mss_demux_videosrc_template);
102   gst_element_class_add_static_pad_template (element_class,
103       &gst_mss_demux_audiosrc_template);
104   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
105       "demuxer", "Demuxer",
106       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
107       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
108
109   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
110 }
111
112 static void
113 gst_mss_demux_class_init (GstMssDemuxClass * klass)
114 {
115   GObjectClass *gobject_class;
116   GstElementClass *gstelement_class;
117
118   gobject_class = (GObjectClass *) klass;
119   gstelement_class = (GstElementClass *) klass;
120
121   parent_class = g_type_class_peek_parent (klass);
122
123   gobject_class->dispose = gst_mss_demux_dispose;
124   gobject_class->set_property = gst_mss_demux_set_property;
125   gobject_class->get_property = gst_mss_demux_get_property;
126
127   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
128       g_param_spec_uint ("connection-speed", "Connection Speed",
129           "Network connection speed in kbps (0 = unknown)",
130           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
131           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132
133   gstelement_class->change_state =
134       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
135 }
136
137 static void
138 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
139 {
140   mssdemux->sinkpad =
141       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
142   gst_pad_set_chain_function (mssdemux->sinkpad,
143       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
144   gst_pad_set_event_function (mssdemux->sinkpad,
145       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
146   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
147
148   g_static_rec_mutex_init (&mssdemux->stream_lock);
149   mssdemux->stream_task =
150       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
151   gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
152 }
153
154 static gboolean
155 _data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
156     guint64 time, gpointer checkdata)
157 {
158   GstMssDemuxStream *stream = checkdata;
159   GstMssDemux *mssdemux = stream->parent;
160
161   if (mssdemux->data_queue_max_size == 0)
162     return FALSE;               /* never full */
163   return visible >= mssdemux->data_queue_max_size;
164 }
165
166 static GstMssDemuxStream *
167 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
168     GstMssStream * manifeststream, GstPad * srcpad)
169 {
170   GstMssDemuxStream *stream;
171
172   stream = g_new0 (GstMssDemuxStream, 1);
173   stream->downloader = gst_uri_downloader_new ();
174   stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
175
176   /* Downloading task */
177   g_static_rec_mutex_init (&stream->download_lock);
178   stream->download_task =
179       gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
180   gst_task_set_lock (stream->download_task, &stream->download_lock);
181
182   stream->pad = srcpad;
183   stream->manifest_stream = manifeststream;
184   stream->parent = mssdemux;
185
186   return stream;
187 }
188
189 static void
190 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
191 {
192   if (stream->download_task) {
193     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
194       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
195           GST_DEBUG_PAD_NAME (stream->pad));
196       gst_task_stop (stream->download_task);
197       g_static_rec_mutex_lock (&stream->download_lock);
198       g_static_rec_mutex_unlock (&stream->download_lock);
199       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
200       gst_task_join (stream->download_task);
201       GST_LOG_OBJECT (stream->parent, "Finished");
202     }
203     gst_object_unref (stream->download_task);
204     g_static_rec_mutex_free (&stream->download_lock);
205     stream->download_task = NULL;
206   }
207
208   if (stream->pending_newsegment) {
209     gst_event_unref (stream->pending_newsegment);
210     stream->pending_newsegment = NULL;
211   }
212
213
214   if (stream->downloader != NULL) {
215     g_object_unref (stream->downloader);
216     stream->downloader = NULL;
217   }
218   if (stream->dataqueue) {
219     g_object_unref (stream->dataqueue);
220     stream->dataqueue = NULL;
221   }
222   if (stream->pad) {
223     gst_object_unref (stream->pad);
224     stream->pad = NULL;
225   }
226   g_free (stream);
227 }
228
229 static void
230 gst_mss_demux_reset (GstMssDemux * mssdemux)
231 {
232   GSList *iter;
233
234   if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
235     gst_task_stop (mssdemux->stream_task);
236     g_static_rec_mutex_lock (&mssdemux->stream_lock);
237     g_static_rec_mutex_unlock (&mssdemux->stream_lock);
238     gst_task_join (mssdemux->stream_task);
239   }
240
241   if (mssdemux->manifest_buffer) {
242     gst_buffer_unref (mssdemux->manifest_buffer);
243     mssdemux->manifest_buffer = NULL;
244   }
245
246   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
247     GstMssDemuxStream *stream = iter->data;
248     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
249     gst_mss_demux_stream_free (stream);
250   }
251   g_slist_free (mssdemux->streams);
252   mssdemux->streams = NULL;
253
254   if (mssdemux->manifest) {
255     gst_mss_manifest_free (mssdemux->manifest);
256     mssdemux->manifest = NULL;
257   }
258
259   mssdemux->n_videos = mssdemux->n_audios = 0;
260   g_free (mssdemux->base_url);
261   g_free (mssdemux->manifest_uri);
262   mssdemux->base_url = NULL;
263 }
264
265 static void
266 gst_mss_demux_dispose (GObject * object)
267 {
268   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
269
270   if (mssdemux->stream_task) {
271     gst_object_unref (mssdemux->stream_task);
272     g_static_rec_mutex_free (&mssdemux->stream_lock);
273     mssdemux->stream_task = NULL;
274   }
275
276   G_OBJECT_CLASS (parent_class)->dispose (object);
277 }
278
279 static void
280 gst_mss_demux_set_property (GObject * object, guint prop_id,
281     const GValue * value, GParamSpec * pspec)
282 {
283   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
284
285   switch (prop_id) {
286     case PROP_CONNECTION_SPEED:
287       GST_OBJECT_LOCK (mssdemux);
288       mssdemux->connection_speed = g_value_get_uint (value) * 1000;
289       mssdemux->update_bitrates = TRUE;
290       GST_DEBUG_OBJECT (mssdemux, "Connection speed set to %llu",
291           mssdemux->connection_speed);
292       GST_OBJECT_UNLOCK (mssdemux);
293       break;
294     default:
295       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
296       break;
297   }
298 }
299
300 static void
301 gst_mss_demux_get_property (GObject * object, guint prop_id, GValue * value,
302     GParamSpec * pspec)
303 {
304   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
305
306   switch (prop_id) {
307     case PROP_CONNECTION_SPEED:
308       g_value_set_uint (value, mssdemux->connection_speed / 1000);
309       break;
310     default:
311       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
312       break;
313   }
314 }
315
316 static GstStateChangeReturn
317 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
318 {
319   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
320   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
321
322   switch (transition) {
323     case GST_STATE_CHANGE_PAUSED_TO_READY:
324       gst_mss_demux_reset (mssdemux);
325       break;
326     case GST_STATE_CHANGE_READY_TO_NULL:
327       break;
328     default:
329       break;
330   }
331
332   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
333
334   switch (transition) {
335     case GST_STATE_CHANGE_PAUSED_TO_READY:{
336       break;
337     }
338     default:
339       break;
340   }
341
342   return result;
343 }
344
345 static GstFlowReturn
346 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
347 {
348   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
349   if (mssdemux->manifest_buffer == NULL)
350     mssdemux->manifest_buffer = buffer;
351   else
352     mssdemux->manifest_buffer =
353         gst_buffer_join (mssdemux->manifest_buffer, buffer);
354
355   return GST_FLOW_OK;
356 }
357
358 static void
359 gst_mss_demux_start (GstMssDemux * mssdemux)
360 {
361   GSList *iter;
362
363   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
364   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
365     GstMssDemuxStream *stream = iter->data;
366     gst_task_start (stream->download_task);
367   }
368
369   gst_task_start (mssdemux->stream_task);
370 }
371
372 static gboolean
373 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
374 {
375   GSList *iter;
376   gboolean ret = TRUE;
377
378   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
379     GstMssDemuxStream *stream = iter->data;
380     gst_event_ref (event);
381     ret = ret & gst_pad_push_event (stream->pad, event);
382   }
383   return ret;
384 }
385
386 static gboolean
387 gst_mss_demux_event (GstPad * pad, GstEvent * event)
388 {
389   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
390   gboolean forward = TRUE;
391   gboolean ret = TRUE;
392
393   switch (GST_EVENT_TYPE (event)) {
394     case GST_EVENT_EOS:
395       if (mssdemux->manifest_buffer == NULL) {
396         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
397         break;
398       }
399
400       if (gst_mss_demux_process_manifest (mssdemux))
401         gst_mss_demux_start (mssdemux);
402       forward = FALSE;
403       break;
404     default:
405       break;
406   }
407
408   if (forward) {
409     ret = gst_pad_event_default (pad, event);
410   } else {
411     gst_event_unref (event);
412   }
413
414   return ret;
415 }
416
417 static void
418 gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
419 {
420   GSList *iter;
421
422   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
423     GstMssDemuxStream *stream = iter->data;
424
425     gst_data_queue_set_flushing (stream->dataqueue, TRUE);
426
427     if (immediate)
428       gst_uri_downloader_cancel (stream->downloader);
429     gst_task_pause (stream->download_task);
430   }
431   gst_task_pause (mssdemux->stream_task);
432
433   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
434     GstMssDemuxStream *stream = iter->data;
435     g_static_rec_mutex_lock (&stream->download_lock);
436   }
437   g_static_rec_mutex_lock (&mssdemux->stream_lock);
438 }
439
440 static void
441 gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
442 {
443   GSList *iter;
444   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
445     GstMssDemuxStream *stream = iter->data;
446     g_static_rec_mutex_unlock (&stream->download_lock);
447   }
448   g_static_rec_mutex_unlock (&mssdemux->stream_lock);
449   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
450     GstMssDemuxStream *stream = iter->data;
451
452     gst_data_queue_set_flushing (stream->dataqueue, FALSE);
453     gst_task_start (stream->download_task);
454   }
455   gst_task_start (mssdemux->stream_task);
456 }
457
458 static gboolean
459 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
460 {
461   GstMssDemux *mssdemux;
462
463   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
464
465   switch (event->type) {
466     case GST_EVENT_SEEK:
467     {
468       gdouble rate;
469       GstFormat format;
470       GstSeekFlags flags;
471       GstSeekType start_type, stop_type;
472       gint64 start, stop;
473       GstEvent *newsegment;
474       GSList *iter;
475
476       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
477
478       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
479           &stop_type, &stop);
480
481       if (format != GST_FORMAT_TIME)
482         return FALSE;
483
484       GST_DEBUG_OBJECT (mssdemux,
485           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
486           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
487
488       if (flags & GST_SEEK_FLAG_FLUSH) {
489         GstEvent *flush = gst_event_new_flush_start ();
490         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
491
492         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
493         gst_mss_demux_push_src_event (mssdemux, flush);
494         gst_event_unref (flush);
495       }
496
497       gst_mss_demux_stop_tasks (mssdemux, TRUE);
498
499       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
500         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
501         return FALSE;
502       }
503
504       newsegment =
505           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
506       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
507       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
508         GstMssDemuxStream *stream = iter->data;
509
510         stream->eos = FALSE;
511         gst_data_queue_flush (stream->dataqueue);
512         stream->pending_newsegment = gst_event_ref (newsegment);
513       }
514       gst_event_unref (newsegment);
515
516       if (flags & GST_SEEK_FLAG_FLUSH) {
517         GstEvent *flush = gst_event_new_flush_stop ();
518         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
519
520         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
521         gst_mss_demux_push_src_event (mssdemux, flush);
522         gst_event_unref (flush);
523       }
524
525       gst_mss_demux_restart_tasks (mssdemux);
526
527       return TRUE;
528     }
529     default:
530       break;
531   }
532
533   return gst_pad_event_default (pad, event);
534 }
535
536 static gboolean
537 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
538 {
539   GstMssDemux *mssdemux;
540   gboolean ret = FALSE;
541
542   if (query == NULL)
543     return FALSE;
544
545   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
546
547   switch (query->type) {
548     case GST_QUERY_DURATION:{
549       GstClockTime duration = -1;
550       GstFormat fmt;
551
552       gst_query_parse_duration (query, &fmt, NULL);
553       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
554         /* TODO should we use the streams accumulated duration or the main manifest duration? */
555         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
556
557         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
558           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
559           ret = TRUE;
560         }
561       }
562       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
563           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
564       break;
565     }
566     case GST_QUERY_LATENCY:{
567       gboolean live = FALSE;
568
569       live = mssdemux->manifest
570           && gst_mss_manifest_is_live (mssdemux->manifest);
571
572       gst_query_set_latency (query, live, 0, -1);
573       ret = TRUE;
574       break;
575     }
576     case GST_QUERY_SEEKING:{
577       GstFormat fmt;
578       gint64 stop = -1;
579
580       if (mssdemux->manifest && gst_mss_manifest_is_live (mssdemux->manifest)) {
581         return FALSE;           /* no live seeking */
582       }
583
584       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
585       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
586           fmt);
587       if (fmt == GST_FORMAT_TIME) {
588         GstClockTime duration;
589         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
590         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
591           stop = duration;
592         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
593         ret = TRUE;
594         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
595             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
596       }
597       break;
598     }
599     default:
600       /* Don't fordward queries upstream because of the special nature of this
601        *  "demuxer", which relies on the upstream element only to be fed
602        *  the Manifest
603        */
604       break;
605   }
606
607   return ret;
608 }
609
610 static void
611 _set_src_pad_functions (GstPad * pad)
612 {
613   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
614   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
615 }
616
617 static GstPad *
618 _create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
619 {
620   gchar *name;
621   GstPad *srcpad = NULL;
622   GstMssStreamType streamtype;
623
624   streamtype = gst_mss_stream_get_type (manifeststream);
625   GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
626       gst_mss_stream_type_name (streamtype));
627
628   /* TODO use stream's name/bitrate/index as the pad name? */
629   if (streamtype == MSS_STREAM_TYPE_VIDEO) {
630     name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
631     srcpad =
632         gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
633         name);
634     g_free (name);
635   } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
636     name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
637     srcpad =
638         gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
639         name);
640     g_free (name);
641   }
642
643   if (!srcpad) {
644     GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
645     return NULL;
646   }
647
648   _set_src_pad_functions (srcpad);
649   return srcpad;
650 }
651
652 static void
653 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
654 {
655   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
656   GSList *iter;
657
658   if (streams == NULL) {
659     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
660     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
661         (_("This file contains no playable streams.")),
662         ("no streams found at the Manifest"));
663     return;
664   }
665
666   for (iter = streams; iter; iter = g_slist_next (iter)) {
667     GstPad *srcpad = NULL;
668     GstMssDemuxStream *stream = NULL;
669     GstMssStream *manifeststream = iter->data;
670
671     srcpad = _create_pad (mssdemux, manifeststream);
672
673     if (!srcpad) {
674       continue;
675     }
676
677     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
678     gst_mss_stream_set_active (manifeststream, TRUE);
679     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
680   }
681
682   /* select initial bitrates */
683   GST_OBJECT_LOCK (mssdemux);
684   GST_INFO_OBJECT (mssdemux, "Changing max bitrate to %llu",
685       mssdemux->connection_speed);
686   gst_mss_manifest_change_bitrate (mssdemux->manifest,
687       mssdemux->connection_speed);
688   mssdemux->update_bitrates = FALSE;
689   GST_OBJECT_UNLOCK (mssdemux);
690 }
691
692 static gboolean
693 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
694 {
695   GstCaps *caps;
696   GstCaps *media_caps;
697   GstPad *pad = stream->pad;
698
699   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
700
701   if (media_caps) {
702     caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
703         "mss-fragmented", "timescale", G_TYPE_UINT64,
704         gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
705         GST_TYPE_CAPS, media_caps, NULL);
706     gst_caps_unref (media_caps);
707     gst_pad_set_caps (pad, caps);
708     gst_caps_unref (caps);
709
710     gst_pad_set_active (pad, TRUE);
711     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
712         GST_DEBUG_PAD_NAME (pad), caps);
713     gst_object_ref (pad);
714     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
715   } else {
716     GST_WARNING_OBJECT (mssdemux,
717         "Couldn't get caps from manifest stream %p %s, not exposing it", stream,
718         GST_PAD_NAME (stream->pad));
719     return FALSE;
720   }
721   return TRUE;
722 }
723
724 static gboolean
725 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
726 {
727   GstQuery *query;
728   gchar *uri = NULL;
729   gboolean ret;
730   GSList *iter;
731
732   g_return_val_if_fail (mssdemux->manifest_buffer != NULL, FALSE);
733   g_return_val_if_fail (mssdemux->manifest == NULL, FALSE);
734
735   query = gst_query_new_uri ();
736   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
737   if (ret) {
738     gchar *baseurl_end;
739     gst_query_parse_uri (query, &uri);
740     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
741
742     mssdemux->manifest_uri = g_strdup (uri);
743     baseurl_end = g_strrstr (uri, "/Manifest");
744     if (baseurl_end) {
745       /* set the new end of the string */
746       baseurl_end[0] = '\0';
747     } else {
748       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
749     }
750
751     mssdemux->base_url = uri;
752   }
753   gst_query_unref (query);
754
755   if (mssdemux->base_url == NULL) {
756     GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
757         (_("Couldn't get the Manifest's URI")),
758         ("need to get the manifest's URI from upstream elements"));
759     return FALSE;
760   }
761
762   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
763   if (!mssdemux->manifest) {
764     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
765         ("Xml manifest file couldn't be parsed"));
766     return FALSE;
767   }
768
769   GST_INFO_OBJECT (mssdemux, "Live stream: %d",
770       gst_mss_manifest_is_live (mssdemux->manifest));
771
772   gst_mss_demux_create_streams (mssdemux);
773   for (iter = mssdemux->streams; iter;) {
774     GSList *current = iter;
775     GstMssDemuxStream *stream = iter->data;
776     iter = g_slist_next (iter); /* do it ourselves as we want it done in the beginning of the loop */
777     if (!gst_mss_demux_expose_stream (mssdemux, stream)) {
778       gst_mss_demux_stream_free (stream);
779       mssdemux->streams = g_slist_delete_link (mssdemux->streams, current);
780     }
781   }
782
783   if (!mssdemux->streams) {
784     /* no streams */
785     GST_WARNING_OBJECT (mssdemux, "Couldn't identify the caps for any of the "
786         "streams found in the manifest");
787     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
788         (_("This file contains no playable streams.")),
789         ("No known stream formats found at the Manifest"));
790     return FALSE;
791   }
792
793   gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
794   return TRUE;
795 }
796
797 static void
798 gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
799 {
800   GstUriDownloader *downloader;
801   GstFragment *manifest_data;
802   GstBuffer *manifest_buffer;
803
804   downloader = gst_uri_downloader_new ();
805
806   manifest_data =
807       gst_uri_downloader_fetch_uri (downloader, mssdemux->manifest_uri);
808   manifest_buffer = gst_fragment_get_buffer (manifest_data);
809   g_object_unref (manifest_data);
810
811   gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer);
812   gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer);
813   gst_buffer_unref (manifest_buffer);
814
815   g_object_unref (downloader);
816 }
817
818 static void
819 gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
820 {
821   GSList *oldpads = NULL;
822   GSList *iter;
823
824   gst_mss_demux_stop_tasks (mssdemux, TRUE);
825   if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
826           mssdemux->connection_speed)) {
827     GstClockTime newseg_ts = GST_CLOCK_TIME_NONE;
828
829     GST_DEBUG_OBJECT (mssdemux, "Creating new pad group");
830     /* if we changed the bitrate, we need to add new pads */
831     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
832       GstMssDemuxStream *stream = iter->data;
833       GstPad *oldpad = stream->pad;
834       GstClockTime ts = GST_CLOCK_TIME_NONE;
835
836       oldpads = g_slist_prepend (oldpads, oldpad);
837
838       /* since we are flushing the queue, get the next un-pushed timestamp to seek
839        * and avoid gaps */
840       gst_data_queue_set_flushing (stream->dataqueue, FALSE);
841       if (!gst_data_queue_is_empty (stream->dataqueue)) {
842         GstDataQueueItem *item = NULL;
843
844         while (!gst_data_queue_is_empty (stream->dataqueue)
845             && !GST_CLOCK_TIME_IS_VALID (ts)) {
846           gst_data_queue_pop (stream->dataqueue, &item);
847
848           if (!item) {
849             g_assert_not_reached ();
850             break;
851           }
852
853           if (GST_IS_BUFFER (item->object)) {
854             GstBuffer *buffer = GST_BUFFER_CAST (item->object);
855
856             ts = GST_BUFFER_TIMESTAMP (buffer);
857           }
858           item->destroy (item);
859         }
860
861       }
862       if (!GST_CLOCK_TIME_IS_VALID (ts)) {
863         ts = gst_mss_stream_get_fragment_gst_timestamp
864             (stream->manifest_stream);
865       }
866
867       if (ts < newseg_ts)
868         newseg_ts = ts;
869
870       GST_DEBUG_OBJECT (mssdemux,
871           "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
872           GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
873       gst_mss_stream_seek (stream->manifest_stream, ts);
874       gst_data_queue_flush (stream->dataqueue);
875
876       stream->pad = _create_pad (mssdemux, stream->manifest_stream);
877       gst_mss_demux_expose_stream (mssdemux, stream);
878
879       gst_pad_push_event (oldpad, gst_event_new_eos ());
880     }
881
882     gst_element_no_more_pads (GST_ELEMENT (mssdemux));
883
884     for (iter = oldpads; iter; iter = g_slist_next (iter)) {
885       GstPad *oldpad = iter->data;
886
887       gst_pad_set_active (oldpad, FALSE);
888       gst_element_remove_pad (GST_ELEMENT (mssdemux), oldpad);
889       gst_object_unref (oldpad);
890     }
891     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
892       GstMssDemuxStream *stream = iter->data;
893
894       stream->pending_newsegment =
895           gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, newseg_ts, -1,
896           newseg_ts);
897     }
898   }
899   gst_mss_demux_restart_tasks (mssdemux);
900 }
901
902 static void
903 _free_data_queue_item (gpointer obj)
904 {
905   GstDataQueueItem *item = obj;
906
907   gst_mini_object_unref (item->object);
908   g_slice_free (GstDataQueueItem, item);
909 }
910
911 static void
912 gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
913     GstMiniObject * obj)
914 {
915   GstDataQueueItem *item;
916
917   item = g_slice_new (GstDataQueueItem);
918   item->object = (GstMiniObject *) obj;
919
920   item->duration = 0;           /* we don't care */
921   item->size = 0;
922   item->visible = TRUE;
923
924   item->destroy = (GDestroyNotify) _free_data_queue_item;
925
926   if (!gst_data_queue_push (stream->dataqueue, item)) {
927     GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
928     gst_mini_object_unref (obj);
929     g_slice_free (GstDataQueueItem, item);
930   }
931 }
932
933 static GstFlowReturn
934 gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
935     GstBuffer ** buffer)
936 {
937   GstMssDemux *mssdemux = stream->parent;
938   gchar *path;
939   gchar *url;
940   GstFragment *fragment;
941   GstBuffer *_buffer;
942   GstFlowReturn ret = GST_FLOW_OK;
943
944   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
945   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
946   switch (ret) {
947     case GST_FLOW_OK:
948       break;                    /* all is good, let's go */
949     case GST_FLOW_UNEXPECTED:  /* EOS */
950       gst_mss_demux_reload_manifest (mssdemux);
951       return GST_FLOW_OK;
952       return GST_FLOW_UNEXPECTED;
953     case GST_FLOW_ERROR:
954       goto error;
955     default:
956       break;
957   }
958   if (!path) {
959     goto no_url_error;
960   }
961   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
962
963   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
964
965   GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
966
967   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
968   g_free (path);
969   g_free (url);
970
971   if (!fragment) {
972     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
973     /* TODO check if we are truly stoping */
974     if (gst_mss_manifest_is_live (mssdemux->manifest)) {
975       /* looks like there is no way of knowing when a live stream has ended
976        * Have to assume we are falling behind and cause a manifest reload */
977       return GST_FLOW_OK;
978     }
979     return GST_FLOW_ERROR;
980   }
981
982   _buffer = gst_fragment_get_buffer (fragment);
983   _buffer = gst_buffer_make_metadata_writable (_buffer);
984   gst_buffer_set_caps (_buffer, GST_PAD_CAPS (stream->pad));
985   GST_BUFFER_TIMESTAMP (_buffer) =
986       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
987   GST_BUFFER_DURATION (_buffer) =
988       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
989
990   g_object_unref (fragment);
991
992   if (buffer)
993     *buffer = _buffer;
994
995   if (_buffer) {
996     GST_DEBUG_OBJECT (mssdemux,
997         "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
998         " Duration: %" GST_TIME_FORMAT,
999         stream, GST_PAD_NAME (stream->pad),
1000         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
1001         GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
1002     gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
1003   }
1004
1005   return ret;
1006
1007 no_url_error:
1008   {
1009     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
1010         (_("Failed to get fragment URL.")),
1011         ("An error happened when getting fragment URL"));
1012     gst_task_stop (stream->download_task);
1013     return GST_FLOW_ERROR;
1014   }
1015 error:
1016   {
1017     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1018     gst_task_stop (stream->download_task);
1019     return GST_FLOW_ERROR;
1020   }
1021 }
1022
1023 static void
1024 gst_mss_demux_download_loop (GstMssDemuxStream * stream)
1025 {
1026   GstMssDemux *mssdemux = stream->parent;
1027   GstBuffer *buffer = NULL;
1028   GstFlowReturn ret;
1029
1030   GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
1031
1032
1033   ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
1034   switch (ret) {
1035     case GST_FLOW_OK:
1036       break;                    /* all is good, let's go */
1037     case GST_FLOW_UNEXPECTED:  /* EOS */
1038       goto eos;
1039     case GST_FLOW_ERROR:
1040       goto error;
1041     default:
1042       break;
1043   }
1044
1045   if (buffer) {
1046     gst_mss_stream_advance_fragment (stream->manifest_stream);
1047   }
1048   GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
1049   return;
1050
1051 eos:
1052   {
1053     GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
1054         GST_DEBUG_PAD_NAME (stream->pad));
1055     gst_mss_demux_stream_store_object (stream,
1056         GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
1057     gst_task_stop (stream->download_task);
1058     return;
1059   }
1060 error:
1061   {
1062     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1063     gst_task_stop (stream->download_task);
1064     return;
1065   }
1066 }
1067
1068 static GstFlowReturn
1069 gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
1070     GstMssDemuxStream ** stream)
1071 {
1072   GstFlowReturn ret = GST_FLOW_OK;
1073   GstMssDemuxStream *current = NULL;
1074   GstClockTime cur_time = GST_CLOCK_TIME_NONE;
1075   GSList *iter;
1076
1077   if (!mssdemux->streams)
1078     return GST_FLOW_ERROR;
1079
1080   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
1081     GstClockTime time;
1082     GstMssDemuxStream *other;
1083     GstDataQueueItem *item;
1084
1085     other = iter->data;
1086     if (other->eos) {
1087       continue;
1088     }
1089
1090     if (gst_data_queue_peek (other->dataqueue, &item)) {
1091     } else {
1092       /* flushing */
1093       return GST_FLOW_WRONG_STATE;
1094     }
1095
1096     if (GST_IS_EVENT (item->object)) {
1097       /* events have higher priority */
1098       current = other;
1099       break;
1100     }
1101     time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
1102     if (time < cur_time) {
1103       cur_time = time;
1104       current = other;
1105     }
1106   }
1107
1108   *stream = current;
1109   if (current == NULL)
1110     ret = GST_FLOW_UNEXPECTED;
1111   return ret;
1112 }
1113
1114 static void
1115 gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
1116 {
1117   GstMssDemuxStream *stream = NULL;
1118   GstFlowReturn ret;
1119   GstMiniObject *object = NULL;
1120   GstDataQueueItem *item = NULL;
1121
1122   GST_LOG_OBJECT (mssdemux, "Starting stream loop");
1123
1124   GST_OBJECT_LOCK (mssdemux);
1125   if (mssdemux->update_bitrates) {
1126     mssdemux->update_bitrates = FALSE;
1127     GST_OBJECT_UNLOCK (mssdemux);
1128
1129     GST_DEBUG_OBJECT (mssdemux,
1130         "Starting streams reconfiguration due to bitrate changes");
1131     gst_mss_demux_reconfigure (mssdemux);
1132     GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
1133   } else {
1134     GST_OBJECT_UNLOCK (mssdemux);
1135   }
1136
1137   ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
1138
1139   if (stream)
1140     GST_DEBUG_OBJECT (mssdemux,
1141         "Stream loop selected %p stream of pad %s. %d - %s", stream,
1142         GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
1143   else
1144     GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
1145         gst_flow_get_name (ret));
1146
1147   switch (ret) {
1148     case GST_FLOW_OK:
1149       break;
1150     case GST_FLOW_ERROR:
1151       goto error;
1152     case GST_FLOW_UNEXPECTED:
1153       goto eos;
1154     case GST_FLOW_WRONG_STATE:
1155       GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
1156       goto stop;
1157     default:
1158       g_assert_not_reached ();
1159   }
1160
1161   GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
1162       stream, GST_PAD_NAME (stream->pad));
1163   if (gst_data_queue_pop (stream->dataqueue, &item)) {
1164     if (item->object)
1165       object = gst_mini_object_ref (item->object);
1166     item->destroy (item);
1167   } else {
1168     GST_DEBUG_OBJECT (mssdemux,
1169         "Failed to get object from dataqueue on stream %p %s", stream,
1170         GST_PAD_NAME (stream->pad));
1171     goto stop;
1172   }
1173
1174   if (G_UNLIKELY (stream->pending_newsegment)) {
1175     gst_pad_push_event (stream->pad, stream->pending_newsegment);
1176     stream->pending_newsegment = NULL;
1177   }
1178
1179   if (G_LIKELY (GST_IS_BUFFER (object))) {
1180     if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
1181       GST_ERROR_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
1182           GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
1183           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1184           GST_TIME_ARGS (stream->next_timestamp));
1185       GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
1186     }
1187
1188     GST_DEBUG_OBJECT (mssdemux,
1189         "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
1190         " discont:%d on pad %s", object,
1191         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1192         GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
1193         GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
1194         GST_PAD_NAME (stream->pad));
1195
1196     stream->next_timestamp =
1197         GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
1198
1199     ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
1200   } else if (GST_IS_EVENT (object)) {
1201     if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
1202       stream->eos = TRUE;
1203     GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
1204         GST_PAD_NAME (stream->pad));
1205     gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
1206   } else {
1207     g_return_if_reached ();
1208   }
1209
1210   switch (ret) {
1211     case GST_FLOW_UNEXPECTED:
1212       goto eos;                 /* EOS ? */
1213     case GST_FLOW_ERROR:
1214       goto error;
1215     case GST_FLOW_NOT_LINKED:
1216       break;                    /* TODO what to do here? pause the task or just keep pushing? */
1217     case GST_FLOW_OK:
1218     default:
1219       break;
1220   }
1221
1222   GST_LOG_OBJECT (mssdemux, "Stream loop end");
1223   return;
1224
1225 eos:
1226   {
1227     GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
1228     gst_task_stop (mssdemux->stream_task);
1229     return;
1230   }
1231 error:
1232   {
1233     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1234     gst_task_stop (mssdemux->stream_task);
1235     return;
1236   }
1237 stop:
1238   {
1239     GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task");
1240     gst_task_stop (mssdemux->stream_task);
1241     return;
1242   }
1243 }