mssdemux: use streams bitrate individually
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  *
29  */
30
31 /*
32  * == Internals
33  *
34  * = Smooth streaming in a few lines
35  * A SS stream is defined by a xml manifest file. This file has a list of
36  * tracks (StreamIndex), each one can have multiple QualityLevels, that define
37  * different encoding/bitrates. When playing a track, only one of those
38  * QualityLevels can be active at a time (per stream).
39  *
40  * The StreamIndex defines a URL with {time} and {bitrate} tags that are
41  * replaced by values indicated by the fragment start times and the selected
42  * QualityLevel, that generates the fragments URLs.
43  *
44  * Another relevant detail is that the Isomedia fragments for smoothstreaming
45  * won't contains a 'moov' atom, nor a 'stsd', so there is no information
46  * about the media type/configuration on the fragments, it must be extracted
47  * from the Manifest and passed downstream. mssdemux does this via GstCaps.
48  *
49  * = How mssdemux works
50  * There is a gstmssmanifest.c utility that holds the manifest and parses
51  * and has functions to extract information from it. mssdemux received the
52  * manifest from its sink pad and starts processing it when it gets EOS.
53  *
54  * The Manifest is parsed and the streams are exposed, 1 pad for each, with
55  * a initially selected QualityLevel. Each stream starts its own GstTaks that
56  * is responsible for downloading fragments and storing in its own GstDataQueue.
57  *
58  * The mssdemux starts another GstTask, this one iterates through the streams
59  * and selects the fragment with the smaller timestamp to push and repeats this.
60  *
61  * When a new connection-speed is set, mssdemux evaluates the available
62  * QualityLevels and might decide to switch to another one. In this case it
63  * exposes new pads for each stream, pushes EOS to the old ones and removes
64  * them. This should make decodebin2 pad switching mechanism act and the
65  * switch would be smooth for the final user.
66  */
67
68 #ifdef HAVE_CONFIG_H
69 #include "config.h"
70 #endif
71
72 #include "gst/gst-i18n-plugin.h"
73
74 #include <stdio.h>
75 #include <stdlib.h>
76 #include <string.h>
77
78 #include "gstmssdemux.h"
79
80 GST_DEBUG_CATEGORY (mssdemux_debug);
81
82 #define DEFAULT_CONNECTION_SPEED 0
83 #define DEFAULT_MAX_QUEUE_SIZE_BUFFERS 0
84 #define DEFAULT_BITRATE_LIMIT 0.8
85
86 #define DOWNLOAD_RATE_MAX_HISTORY_LENGTH 5
87
88 enum
89 {
90   PROP_0,
91
92   PROP_CONNECTION_SPEED,
93   PROP_MAX_QUEUE_SIZE_BUFFERS,
94   PROP_BITRATE_LIMIT,
95   PROP_LAST
96 };
97
98 static GstStaticPadTemplate gst_mss_demux_sink_template =
99 GST_STATIC_PAD_TEMPLATE ("sink",
100     GST_PAD_SINK,
101     GST_PAD_ALWAYS,
102     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
103     );
104
105 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
106 GST_STATIC_PAD_TEMPLATE ("video_%02u",
107     GST_PAD_SRC,
108     GST_PAD_SOMETIMES,
109     GST_STATIC_CAPS_ANY);
110
111 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
112 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
113     GST_PAD_SRC,
114     GST_PAD_SOMETIMES,
115     GST_STATIC_CAPS_ANY);
116
117 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
118
119 static void gst_mss_demux_dispose (GObject * object);
120 static void gst_mss_demux_set_property (GObject * object, guint prop_id,
121     const GValue * value, GParamSpec * pspec);
122 static void gst_mss_demux_get_property (GObject * object, guint prop_id,
123     GValue * value, GParamSpec * pspec);
124 static GstStateChangeReturn gst_mss_demux_change_state (GstElement * element,
125     GstStateChange transition);
126 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
127 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
128
129 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
130
131 static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
132 static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
133
134 static gboolean gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
135
136 static void
137 gst_mss_demux_base_init (gpointer klass)
138 {
139   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
140
141   gst_element_class_add_static_pad_template (element_class,
142       &gst_mss_demux_sink_template);
143   gst_element_class_add_static_pad_template (element_class,
144       &gst_mss_demux_videosrc_template);
145   gst_element_class_add_static_pad_template (element_class,
146       &gst_mss_demux_audiosrc_template);
147   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
148       "demuxer", "Demuxer",
149       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
150       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
151
152   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
153 }
154
155 static void
156 gst_mss_demux_class_init (GstMssDemuxClass * klass)
157 {
158   GObjectClass *gobject_class;
159   GstElementClass *gstelement_class;
160
161   gobject_class = (GObjectClass *) klass;
162   gstelement_class = (GstElementClass *) klass;
163
164   gobject_class->dispose = gst_mss_demux_dispose;
165   gobject_class->set_property = gst_mss_demux_set_property;
166   gobject_class->get_property = gst_mss_demux_get_property;
167
168   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
169       g_param_spec_uint ("connection-speed", "Connection Speed",
170           "Network connection speed in kbps (0 = unknown)",
171           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
172           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173
174   g_object_class_install_property (gobject_class, PROP_MAX_QUEUE_SIZE_BUFFERS,
175       g_param_spec_uint ("max-queue-size-buffers", "Max queue size in buffers",
176           "Maximum buffers that can be stored in each internal stream queue "
177           "(0 = infinite)", 0, G_MAXUINT, DEFAULT_MAX_QUEUE_SIZE_BUFFERS,
178           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
179
180   g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
181       g_param_spec_float ("bitrate-limit",
182           "Bitrate limit in %",
183           "Limit of the available bitrate to use when switching to alternates.",
184           0, 1, DEFAULT_BITRATE_LIMIT,
185           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
186
187   gstelement_class->change_state =
188       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
189 }
190
191 static void
192 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
193 {
194   mssdemux->sinkpad =
195       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
196   gst_pad_set_chain_function (mssdemux->sinkpad,
197       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
198   gst_pad_set_event_function (mssdemux->sinkpad,
199       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
200   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
201
202   g_static_rec_mutex_init (&mssdemux->stream_lock);
203   mssdemux->stream_task =
204       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
205   gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
206
207   mssdemux->data_queue_max_size = DEFAULT_MAX_QUEUE_SIZE_BUFFERS;
208   mssdemux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
209 }
210
211 static gboolean
212 _data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
213     guint64 time, gpointer checkdata)
214 {
215   GstMssDemuxStream *stream = checkdata;
216   GstMssDemux *mssdemux = stream->parent;
217
218   if (mssdemux->data_queue_max_size == 0)
219     return FALSE;               /* never full */
220   return visible >= mssdemux->data_queue_max_size;
221 }
222
223 static GstMssDemuxStream *
224 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
225     GstMssStream * manifeststream, GstPad * srcpad)
226 {
227   GstMssDemuxStream *stream;
228
229   stream = g_new0 (GstMssDemuxStream, 1);
230   stream->downloader = gst_uri_downloader_new ();
231   stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
232
233   /* Downloading task */
234   g_static_rec_mutex_init (&stream->download_lock);
235   stream->download_task =
236       gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
237   gst_task_set_lock (stream->download_task, &stream->download_lock);
238
239   stream->pad = srcpad;
240   stream->manifest_stream = manifeststream;
241   stream->parent = mssdemux;
242   gst_download_rate_init (&stream->download_rate);
243   gst_download_rate_set_max_length (&stream->download_rate,
244       DOWNLOAD_RATE_MAX_HISTORY_LENGTH);
245
246   return stream;
247 }
248
249 static void
250 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
251 {
252   gst_download_rate_deinit (&stream->download_rate);
253   if (stream->download_task) {
254     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
255       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
256           GST_DEBUG_PAD_NAME (stream->pad));
257       gst_uri_downloader_cancel (stream->downloader);
258       gst_task_stop (stream->download_task);
259       g_static_rec_mutex_lock (&stream->download_lock);
260       g_static_rec_mutex_unlock (&stream->download_lock);
261       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
262       gst_task_join (stream->download_task);
263       GST_LOG_OBJECT (stream->parent, "Finished");
264     }
265     gst_object_unref (stream->download_task);
266     g_static_rec_mutex_free (&stream->download_lock);
267     stream->download_task = NULL;
268   }
269
270   if (stream->pending_newsegment) {
271     gst_event_unref (stream->pending_newsegment);
272     stream->pending_newsegment = NULL;
273   }
274
275
276   if (stream->downloader != NULL) {
277     g_object_unref (stream->downloader);
278     stream->downloader = NULL;
279   }
280   if (stream->dataqueue) {
281     g_object_unref (stream->dataqueue);
282     stream->dataqueue = NULL;
283   }
284   if (stream->pad) {
285     gst_object_unref (stream->pad);
286     stream->pad = NULL;
287   }
288   g_free (stream);
289 }
290
291 static void
292 gst_mss_demux_reset (GstMssDemux * mssdemux)
293 {
294   GSList *iter;
295
296   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
297     GstMssDemuxStream *stream = iter->data;
298     if (stream->downloader)
299       gst_uri_downloader_cancel (stream->downloader);
300
301     gst_data_queue_set_flushing (stream->dataqueue, TRUE);
302   }
303
304   if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
305     gst_task_stop (mssdemux->stream_task);
306     g_static_rec_mutex_lock (&mssdemux->stream_lock);
307     g_static_rec_mutex_unlock (&mssdemux->stream_lock);
308     gst_task_join (mssdemux->stream_task);
309   }
310
311   if (mssdemux->manifest_buffer) {
312     gst_buffer_unref (mssdemux->manifest_buffer);
313     mssdemux->manifest_buffer = NULL;
314   }
315
316   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
317     GstMssDemuxStream *stream = iter->data;
318     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
319     gst_mss_demux_stream_free (stream);
320   }
321   g_slist_free (mssdemux->streams);
322   mssdemux->streams = NULL;
323
324   if (mssdemux->manifest) {
325     gst_mss_manifest_free (mssdemux->manifest);
326     mssdemux->manifest = NULL;
327   }
328
329   mssdemux->n_videos = mssdemux->n_audios = 0;
330   g_free (mssdemux->base_url);
331   mssdemux->base_url = NULL;
332   g_free (mssdemux->manifest_uri);
333   mssdemux->manifest_uri = NULL;
334 }
335
336 static void
337 gst_mss_demux_dispose (GObject * object)
338 {
339   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
340
341   gst_mss_demux_reset (mssdemux);
342
343   if (mssdemux->stream_task) {
344     gst_object_unref (mssdemux->stream_task);
345     g_static_rec_mutex_free (&mssdemux->stream_lock);
346     mssdemux->stream_task = NULL;
347   }
348
349   G_OBJECT_CLASS (parent_class)->dispose (object);
350 }
351
352 static void
353 gst_mss_demux_set_property (GObject * object, guint prop_id,
354     const GValue * value, GParamSpec * pspec)
355 {
356   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
357
358   switch (prop_id) {
359     case PROP_CONNECTION_SPEED:
360       GST_OBJECT_LOCK (mssdemux);
361       mssdemux->connection_speed = g_value_get_uint (value) * 1000;
362       mssdemux->update_bitrates = TRUE;
363       GST_DEBUG_OBJECT (mssdemux, "Connection speed set to %llu",
364           mssdemux->connection_speed);
365       GST_OBJECT_UNLOCK (mssdemux);
366       break;
367     case PROP_MAX_QUEUE_SIZE_BUFFERS:
368       mssdemux->data_queue_max_size = g_value_get_uint (value);
369       break;
370     case PROP_BITRATE_LIMIT:
371       mssdemux->bitrate_limit = g_value_get_float (value);
372       break;
373     default:
374       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
375       break;
376   }
377 }
378
379 static void
380 gst_mss_demux_get_property (GObject * object, guint prop_id, GValue * value,
381     GParamSpec * pspec)
382 {
383   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
384
385   switch (prop_id) {
386     case PROP_CONNECTION_SPEED:
387       g_value_set_uint (value, mssdemux->connection_speed / 1000);
388       break;
389     case PROP_MAX_QUEUE_SIZE_BUFFERS:
390       g_value_set_uint (value, mssdemux->data_queue_max_size);
391       break;
392     case PROP_BITRATE_LIMIT:
393       g_value_set_float (value, mssdemux->bitrate_limit);
394       break;
395     default:
396       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
397       break;
398   }
399 }
400
401 static GstStateChangeReturn
402 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
403 {
404   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
405   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
406
407   switch (transition) {
408     case GST_STATE_CHANGE_PAUSED_TO_READY:
409       gst_mss_demux_reset (mssdemux);
410       break;
411     case GST_STATE_CHANGE_READY_TO_NULL:
412       break;
413     default:
414       break;
415   }
416
417   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
418
419   switch (transition) {
420     case GST_STATE_CHANGE_PAUSED_TO_READY:{
421       break;
422     }
423     default:
424       break;
425   }
426
427   return result;
428 }
429
430 static GstFlowReturn
431 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
432 {
433   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
434   if (mssdemux->manifest_buffer == NULL)
435     mssdemux->manifest_buffer = buffer;
436   else
437     mssdemux->manifest_buffer =
438         gst_buffer_join (mssdemux->manifest_buffer, buffer);
439
440   GST_INFO_OBJECT (mssdemux, "Received manifest buffer, total size is %i bytes",
441       GST_BUFFER_SIZE (mssdemux->manifest_buffer));
442
443   return GST_FLOW_OK;
444 }
445
446 static void
447 gst_mss_demux_start (GstMssDemux * mssdemux)
448 {
449   GSList *iter;
450
451   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
452   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
453     GstMssDemuxStream *stream = iter->data;
454     gst_task_start (stream->download_task);
455   }
456
457   gst_task_start (mssdemux->stream_task);
458 }
459
460 static gboolean
461 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
462 {
463   GSList *iter;
464   gboolean ret = TRUE;
465
466   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
467     GstMssDemuxStream *stream = iter->data;
468     gst_event_ref (event);
469     ret = ret & gst_pad_push_event (stream->pad, event);
470   }
471   gst_event_unref (event);
472   return ret;
473 }
474
475 static gboolean
476 gst_mss_demux_event (GstPad * pad, GstEvent * event)
477 {
478   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
479   gboolean forward = TRUE;
480   gboolean ret = TRUE;
481
482   switch (GST_EVENT_TYPE (event)) {
483     case GST_EVENT_FLUSH_STOP:
484       gst_mss_demux_reset (mssdemux);
485       break;
486     case GST_EVENT_EOS:
487       if (mssdemux->manifest_buffer == NULL) {
488         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
489         break;
490       }
491       GST_INFO_OBJECT (mssdemux, "Received EOS");
492
493       if (gst_mss_demux_process_manifest (mssdemux))
494         gst_mss_demux_start (mssdemux);
495       forward = FALSE;
496       break;
497     default:
498       break;
499   }
500
501   if (forward) {
502     ret = gst_pad_event_default (pad, event);
503   } else {
504     gst_event_unref (event);
505   }
506
507   return ret;
508 }
509
510 static void
511 gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
512 {
513   GSList *iter;
514
515   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
516     GstMssDemuxStream *stream = iter->data;
517
518     gst_data_queue_set_flushing (stream->dataqueue, TRUE);
519
520     if (immediate)
521       gst_uri_downloader_cancel (stream->downloader);
522     gst_task_pause (stream->download_task);
523   }
524   gst_task_pause (mssdemux->stream_task);
525
526   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
527     GstMssDemuxStream *stream = iter->data;
528     g_static_rec_mutex_lock (&stream->download_lock);
529   }
530   g_static_rec_mutex_lock (&mssdemux->stream_lock);
531 }
532
533 static void
534 gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
535 {
536   GSList *iter;
537   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
538     GstMssDemuxStream *stream = iter->data;
539     gst_uri_downloader_reset (stream->downloader);
540     g_static_rec_mutex_unlock (&stream->download_lock);
541   }
542   g_static_rec_mutex_unlock (&mssdemux->stream_lock);
543   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
544     GstMssDemuxStream *stream = iter->data;
545
546     gst_data_queue_set_flushing (stream->dataqueue, FALSE);
547     gst_task_start (stream->download_task);
548   }
549   gst_task_start (mssdemux->stream_task);
550 }
551
552 static gboolean
553 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
554 {
555   GstMssDemux *mssdemux;
556
557   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
558
559   switch (event->type) {
560     case GST_EVENT_SEEK:
561     {
562       gdouble rate;
563       GstFormat format;
564       GstSeekFlags flags;
565       GstSeekType start_type, stop_type;
566       gint64 start, stop;
567       GstEvent *newsegment;
568       GSList *iter;
569
570       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
571
572       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
573           &stop_type, &stop);
574
575       if (format != GST_FORMAT_TIME)
576         goto not_supported;
577
578       GST_DEBUG_OBJECT (mssdemux,
579           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
580           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
581
582       if (flags & GST_SEEK_FLAG_FLUSH) {
583         GstEvent *flush = gst_event_new_flush_start ();
584         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
585
586         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
587         gst_mss_demux_push_src_event (mssdemux, flush);
588       }
589
590       gst_mss_demux_stop_tasks (mssdemux, TRUE);
591
592       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
593         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
594         goto not_supported;
595       }
596
597       newsegment =
598           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
599       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
600       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
601         GstMssDemuxStream *stream = iter->data;
602
603         stream->eos = FALSE;
604         gst_data_queue_flush (stream->dataqueue);
605         gst_event_ref (newsegment);
606         gst_event_replace (&stream->pending_newsegment, newsegment);
607       }
608       gst_event_unref (newsegment);
609
610       if (flags & GST_SEEK_FLAG_FLUSH) {
611         GstEvent *flush = gst_event_new_flush_stop ();
612         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
613
614         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
615         gst_mss_demux_push_src_event (mssdemux, flush);
616       }
617
618       gst_mss_demux_restart_tasks (mssdemux);
619
620       gst_event_unref (event);
621       return TRUE;
622     }
623     default:
624       break;
625   }
626
627   return gst_pad_event_default (pad, event);
628
629 not_supported:
630   gst_event_unref (event);
631   return FALSE;
632 }
633
634 static gboolean
635 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
636 {
637   GstMssDemux *mssdemux;
638   gboolean ret = FALSE;
639
640   if (query == NULL)
641     return FALSE;
642
643   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
644
645   switch (query->type) {
646     case GST_QUERY_DURATION:{
647       GstClockTime duration = -1;
648       GstFormat fmt;
649
650       gst_query_parse_duration (query, &fmt, NULL);
651       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
652         /* TODO should we use the streams accumulated duration or the main manifest duration? */
653         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
654
655         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
656           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
657           ret = TRUE;
658         }
659       }
660       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
661           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
662       break;
663     }
664     case GST_QUERY_LATENCY:{
665       gboolean live = FALSE;
666
667       live = mssdemux->manifest
668           && gst_mss_manifest_is_live (mssdemux->manifest);
669
670       gst_query_set_latency (query, live, 0, -1);
671       ret = TRUE;
672       break;
673     }
674     case GST_QUERY_SEEKING:{
675       GstFormat fmt;
676       gint64 stop = -1;
677
678       if (mssdemux->manifest && gst_mss_manifest_is_live (mssdemux->manifest)) {
679         return FALSE;           /* no live seeking */
680       }
681
682       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
683       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
684           fmt);
685       if (fmt == GST_FORMAT_TIME) {
686         GstClockTime duration;
687         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
688         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
689           stop = duration;
690         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
691         ret = TRUE;
692         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
693             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
694       }
695       break;
696     }
697     default:
698       /* Don't fordward queries upstream because of the special nature of this
699        *  "demuxer", which relies on the upstream element only to be fed
700        *  the Manifest
701        */
702       break;
703   }
704
705   return ret;
706 }
707
708 static void
709 _set_src_pad_functions (GstPad * pad)
710 {
711   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
712   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
713 }
714
715 static GstPad *
716 _create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
717 {
718   gchar *name;
719   GstPad *srcpad = NULL;
720   GstMssStreamType streamtype;
721
722   streamtype = gst_mss_stream_get_type (manifeststream);
723   GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
724       gst_mss_stream_type_name (streamtype));
725
726   /* TODO use stream's name/bitrate/index as the pad name? */
727   if (streamtype == MSS_STREAM_TYPE_VIDEO) {
728     name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
729     srcpad =
730         gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
731         name);
732     g_free (name);
733   } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
734     name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
735     srcpad =
736         gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
737         name);
738     g_free (name);
739   }
740
741   if (!srcpad) {
742     GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
743     return NULL;
744   }
745
746   _set_src_pad_functions (srcpad);
747   return srcpad;
748 }
749
750 static void
751 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
752 {
753   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
754   GSList *iter;
755
756   if (streams == NULL) {
757     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
758     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
759         (_("This file contains no playable streams.")),
760         ("no streams found at the Manifest"));
761     return;
762   }
763
764   for (iter = streams; iter; iter = g_slist_next (iter)) {
765     GstPad *srcpad = NULL;
766     GstMssDemuxStream *stream = NULL;
767     GstMssStream *manifeststream = iter->data;
768
769     srcpad = _create_pad (mssdemux, manifeststream);
770
771     if (!srcpad) {
772       continue;
773     }
774
775     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
776     gst_mss_stream_set_active (manifeststream, TRUE);
777     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
778   }
779
780   /* select initial bitrates */
781   GST_OBJECT_LOCK (mssdemux);
782   GST_INFO_OBJECT (mssdemux, "Changing max bitrate to %llu",
783       mssdemux->connection_speed);
784   gst_mss_manifest_change_bitrate (mssdemux->manifest,
785       mssdemux->connection_speed);
786   mssdemux->update_bitrates = FALSE;
787   GST_OBJECT_UNLOCK (mssdemux);
788 }
789
790 static gboolean
791 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
792 {
793   GstCaps *caps;
794   GstCaps *media_caps;
795   GstPad *pad = stream->pad;
796
797   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
798
799   if (media_caps) {
800     caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
801         "mss-fragmented", "timescale", G_TYPE_UINT64,
802         gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
803         GST_TYPE_CAPS, media_caps, NULL);
804     gst_caps_unref (media_caps);
805     gst_pad_set_caps (pad, caps);
806     gst_caps_unref (caps);
807
808     gst_pad_set_active (pad, TRUE);
809     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
810         GST_DEBUG_PAD_NAME (pad), caps);
811     gst_object_ref (pad);
812     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
813   } else {
814     GST_WARNING_OBJECT (mssdemux,
815         "Couldn't get caps from manifest stream %p %s, not exposing it", stream,
816         GST_PAD_NAME (stream->pad));
817     return FALSE;
818   }
819   return TRUE;
820 }
821
822 static gboolean
823 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
824 {
825   GstQuery *query;
826   gchar *uri = NULL;
827   gboolean ret;
828   GSList *iter;
829
830   g_return_val_if_fail (mssdemux->manifest_buffer != NULL, FALSE);
831   g_return_val_if_fail (mssdemux->manifest == NULL, FALSE);
832
833   query = gst_query_new_uri ();
834   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
835   if (ret) {
836     gchar *baseurl_end;
837     gst_query_parse_uri (query, &uri);
838     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
839
840     mssdemux->manifest_uri = g_strdup (uri);
841     baseurl_end = g_strrstr (uri, "/Manifest");
842     if (baseurl_end) {
843       /* set the new end of the string */
844       baseurl_end[0] = '\0';
845     } else {
846       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
847     }
848
849     mssdemux->base_url = uri;
850   }
851   gst_query_unref (query);
852
853   if (mssdemux->base_url == NULL) {
854     GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
855         (_("Couldn't get the Manifest's URI")),
856         ("need to get the manifest's URI from upstream elements"));
857     return FALSE;
858   }
859
860   GST_INFO_OBJECT (mssdemux, "Received manifest: %i bytes",
861       GST_BUFFER_SIZE (mssdemux->manifest_buffer));
862
863   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
864   if (!mssdemux->manifest) {
865     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
866         ("Xml manifest file couldn't be parsed"));
867     return FALSE;
868   }
869
870   GST_INFO_OBJECT (mssdemux, "Live stream: %d",
871       gst_mss_manifest_is_live (mssdemux->manifest));
872
873   gst_mss_demux_create_streams (mssdemux);
874   for (iter = mssdemux->streams; iter;) {
875     GSList *current = iter;
876     GstMssDemuxStream *stream = iter->data;
877     iter = g_slist_next (iter); /* do it ourselves as we want it done in the beginning of the loop */
878     if (!gst_mss_demux_expose_stream (mssdemux, stream)) {
879       gst_mss_demux_stream_free (stream);
880       mssdemux->streams = g_slist_delete_link (mssdemux->streams, current);
881     }
882   }
883
884   if (!mssdemux->streams) {
885     /* no streams */
886     GST_WARNING_OBJECT (mssdemux, "Couldn't identify the caps for any of the "
887         "streams found in the manifest");
888     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
889         (_("This file contains no playable streams.")),
890         ("No known stream formats found at the Manifest"));
891     return FALSE;
892   }
893
894   gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
895   return TRUE;
896 }
897
898 static void
899 gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
900 {
901   GstUriDownloader *downloader;
902   GstFragment *manifest_data;
903   GstBuffer *manifest_buffer;
904
905   downloader = gst_uri_downloader_new ();
906
907   manifest_data =
908       gst_uri_downloader_fetch_uri (downloader, mssdemux->manifest_uri);
909   manifest_buffer = gst_fragment_get_buffer (manifest_data);
910   g_object_unref (manifest_data);
911
912   gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer);
913   gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer);
914   gst_buffer_unref (manifest_buffer);
915
916   g_object_unref (downloader);
917 }
918
919 static guint64
920 gst_mss_demux_get_download_bitrate (GstMssDemux * mssdemux)
921 {
922   GSList *iter;
923   guint64 total = 0;
924   guint64 count = 0;
925
926   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
927     GstMssDemuxStream *stream = iter->data;
928
929     total += gst_download_rate_get_current_rate (&stream->download_rate);
930     count++;
931   }
932
933   return total / count;
934 }
935
936 static gboolean
937 gst_mss_demux_all_streams_have_data (GstMssDemux * mssdemux)
938 {
939   GSList *iter;
940
941   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
942     GstMssDemuxStream *stream = iter->data;
943
944     if (!stream->have_data)
945       return FALSE;
946   }
947
948   return TRUE;
949 }
950
951 static void
952 gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
953 {
954   GSList *oldpads = NULL;
955   GSList *iter;
956   guint64 new_bitrate;
957   gboolean changed = FALSE;
958
959   /* TODO lock? */
960
961   if (!gst_mss_demux_all_streams_have_data (mssdemux))
962     return;
963
964   gst_mss_demux_stop_tasks (mssdemux, TRUE);
965   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
966     GstMssDemuxStream *stream;
967
968     stream = iter->data;
969
970     new_bitrate =
971         mssdemux->bitrate_limit *
972         gst_download_rate_get_current_rate (&stream->download_rate);
973     if (mssdemux->connection_speed) {
974       new_bitrate = MIN (mssdemux->connection_speed, new_bitrate);
975     }
976
977     GST_DEBUG_OBJECT (mssdemux, "Current stream %s download bitrate %llu",
978         GST_PAD_NAME (stream->pad), new_bitrate);
979
980     if (gst_mss_stream_select_bitrate (stream->manifest_stream, new_bitrate)) {
981       changed = TRUE;
982       GST_INFO_OBJECT (mssdemux, "Stream %s changed bitrate to %llu",
983           GST_PAD_NAME (stream->pad),
984           gst_mss_stream_get_current_bitrate (stream->manifest_stream));
985     }
986   }
987
988   if (changed) {
989     GstClockTime newseg_ts = GST_CLOCK_TIME_NONE;
990
991     GST_DEBUG_OBJECT (mssdemux,
992         "Bitrates have changed, creating new pad group");
993     /* if we changed the bitrate, we need to add new pads */
994     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
995       GstMssDemuxStream *stream = iter->data;
996       GstPad *oldpad = stream->pad;
997       GstClockTime ts = GST_CLOCK_TIME_NONE;
998
999       oldpads = g_slist_prepend (oldpads, oldpad);
1000
1001       /* since we are flushing the queue, get the next un-pushed timestamp to seek
1002        * and avoid gaps */
1003       gst_data_queue_set_flushing (stream->dataqueue, FALSE);
1004       if (!gst_data_queue_is_empty (stream->dataqueue)) {
1005         GstDataQueueItem *item = NULL;
1006
1007         while (!gst_data_queue_is_empty (stream->dataqueue)
1008             && !GST_CLOCK_TIME_IS_VALID (ts)) {
1009           gst_data_queue_pop (stream->dataqueue, &item);
1010
1011           if (!item) {
1012             g_assert_not_reached ();
1013             break;
1014           }
1015
1016           if (GST_IS_BUFFER (item->object)) {
1017             GstBuffer *buffer = GST_BUFFER_CAST (item->object);
1018
1019             ts = GST_BUFFER_TIMESTAMP (buffer);
1020           }
1021           item->destroy (item);
1022         }
1023
1024       }
1025       if (!GST_CLOCK_TIME_IS_VALID (ts)) {
1026         ts = gst_mss_stream_get_fragment_gst_timestamp
1027             (stream->manifest_stream);
1028       }
1029
1030       if (ts < newseg_ts)
1031         newseg_ts = ts;
1032
1033       GST_DEBUG_OBJECT (mssdemux,
1034           "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
1035           GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
1036       gst_mss_stream_seek (stream->manifest_stream, ts);
1037       gst_data_queue_flush (stream->dataqueue);
1038
1039       stream->pad = _create_pad (mssdemux, stream->manifest_stream);
1040       gst_mss_demux_expose_stream (mssdemux, stream);
1041
1042       gst_pad_push_event (oldpad, gst_event_new_eos ());
1043       stream->have_data = FALSE;
1044     }
1045
1046     gst_element_no_more_pads (GST_ELEMENT (mssdemux));
1047
1048     for (iter = oldpads; iter; iter = g_slist_next (iter)) {
1049       GstPad *oldpad = iter->data;
1050
1051       gst_pad_set_active (oldpad, FALSE);
1052       gst_element_remove_pad (GST_ELEMENT (mssdemux), oldpad);
1053       gst_object_unref (oldpad);
1054     }
1055     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
1056       GstMssDemuxStream *stream = iter->data;
1057
1058       stream->pending_newsegment =
1059           gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, newseg_ts, -1,
1060           newseg_ts);
1061     }
1062   }
1063   gst_mss_demux_restart_tasks (mssdemux);
1064 }
1065
1066 static void
1067 _free_data_queue_item (gpointer obj)
1068 {
1069   GstDataQueueItem *item = obj;
1070
1071   gst_mini_object_unref (item->object);
1072   g_slice_free (GstDataQueueItem, item);
1073 }
1074
1075 static void
1076 gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
1077     GstMiniObject * obj)
1078 {
1079   GstDataQueueItem *item;
1080
1081   item = g_slice_new (GstDataQueueItem);
1082   item->object = (GstMiniObject *) obj;
1083
1084   item->duration = 0;           /* we don't care */
1085   item->size = 0;
1086   item->visible = TRUE;
1087
1088   item->destroy = (GDestroyNotify) _free_data_queue_item;
1089
1090   if (!gst_data_queue_push (stream->dataqueue, item)) {
1091     GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
1092     item->destroy (item);
1093   }
1094 }
1095
1096 static GstFlowReturn
1097 gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
1098     GstBuffer ** buffer)
1099 {
1100   GstMssDemux *mssdemux = stream->parent;
1101   gchar *path;
1102   gchar *url;
1103   GstFragment *fragment;
1104   GstBuffer *_buffer;
1105   GstFlowReturn ret = GST_FLOW_OK;
1106   guint64 before_download, after_download;
1107
1108   before_download = g_get_real_time ();
1109
1110   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
1111   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
1112   switch (ret) {
1113     case GST_FLOW_OK:
1114       break;                    /* all is good, let's go */
1115     case GST_FLOW_UNEXPECTED:  /* EOS */
1116       if (gst_mss_manifest_is_live (mssdemux->manifest)) {
1117         gst_mss_demux_reload_manifest (mssdemux);
1118         return GST_FLOW_OK;
1119       }
1120       return GST_FLOW_UNEXPECTED;
1121     case GST_FLOW_ERROR:
1122       goto error;
1123     default:
1124       break;
1125   }
1126   if (!path) {
1127     goto no_url_error;
1128   }
1129   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
1130
1131   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
1132
1133   GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
1134
1135   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
1136   g_free (path);
1137   g_free (url);
1138
1139   if (!fragment) {
1140     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
1141     /* TODO check if we are truly stoping */
1142     if (gst_mss_manifest_is_live (mssdemux->manifest)) {
1143       /* looks like there is no way of knowing when a live stream has ended
1144        * Have to assume we are falling behind and cause a manifest reload */
1145       return GST_FLOW_OK;
1146     }
1147     return GST_FLOW_ERROR;
1148   }
1149
1150   _buffer = gst_fragment_get_buffer (fragment);
1151   _buffer = gst_buffer_make_metadata_writable (_buffer);
1152   gst_buffer_set_caps (_buffer, GST_PAD_CAPS (stream->pad));
1153   GST_BUFFER_TIMESTAMP (_buffer) =
1154       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
1155   GST_BUFFER_DURATION (_buffer) =
1156       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
1157
1158   g_object_unref (fragment);
1159
1160   if (buffer)
1161     *buffer = _buffer;
1162
1163   after_download = g_get_real_time ();
1164   if (_buffer) {
1165     guint64 bitrate = (8 * GST_BUFFER_SIZE (_buffer) * 1000000LLU) /
1166         (after_download - before_download);
1167
1168     GST_DEBUG_OBJECT (mssdemux, "Measured download bitrate: %s %llu bps",
1169         GST_PAD_NAME (stream->pad), bitrate);
1170     gst_download_rate_add_rate (&stream->download_rate,
1171         GST_BUFFER_SIZE (_buffer), 1000 * (after_download - before_download));
1172
1173     GST_DEBUG_OBJECT (mssdemux,
1174         "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
1175         " Duration: %" GST_TIME_FORMAT,
1176         stream, GST_PAD_NAME (stream->pad),
1177         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
1178         GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
1179     gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
1180   }
1181
1182
1183   GST_OBJECT_LOCK (mssdemux);
1184   mssdemux->update_bitrates = TRUE;
1185   GST_OBJECT_UNLOCK (mssdemux);
1186
1187   return ret;
1188
1189 no_url_error:
1190   {
1191     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
1192         (_("Failed to get fragment URL.")),
1193         ("An error happened when getting fragment URL"));
1194     gst_task_pause (stream->download_task);
1195     return GST_FLOW_ERROR;
1196   }
1197 error:
1198   {
1199     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1200     gst_task_pause (stream->download_task);
1201     return GST_FLOW_ERROR;
1202   }
1203 }
1204
1205 static void
1206 gst_mss_demux_download_loop (GstMssDemuxStream * stream)
1207 {
1208   GstMssDemux *mssdemux = stream->parent;
1209   GstBuffer *buffer = NULL;
1210   GstFlowReturn ret;
1211
1212   GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
1213
1214   ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
1215   switch (ret) {
1216     case GST_FLOW_OK:
1217       break;                    /* all is good, let's go */
1218     case GST_FLOW_UNEXPECTED:  /* EOS */
1219       goto eos;
1220     case GST_FLOW_ERROR:
1221       goto error;
1222     default:
1223       break;
1224   }
1225
1226   if (buffer) {
1227     gst_mss_stream_advance_fragment (stream->manifest_stream);
1228   }
1229   GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
1230   return;
1231
1232 eos:
1233   {
1234     GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
1235         GST_DEBUG_PAD_NAME (stream->pad));
1236     gst_mss_demux_stream_store_object (stream,
1237         GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
1238     gst_task_pause (stream->download_task);
1239     return;
1240   }
1241 error:
1242   {
1243     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1244     gst_task_pause (stream->download_task);
1245     return;
1246   }
1247 }
1248
1249 static GstFlowReturn
1250 gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
1251     GstMssDemuxStream ** stream)
1252 {
1253   GstFlowReturn ret = GST_FLOW_OK;
1254   GstMssDemuxStream *current = NULL;
1255   GstClockTime cur_time = GST_CLOCK_TIME_NONE;
1256   GSList *iter;
1257
1258   if (!mssdemux->streams)
1259     return GST_FLOW_ERROR;
1260
1261   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
1262     GstClockTime time;
1263     GstMssDemuxStream *other;
1264     GstDataQueueItem *item;
1265
1266     other = iter->data;
1267     if (other->eos) {
1268       continue;
1269     }
1270
1271     if (!gst_data_queue_peek (other->dataqueue, &item)) {
1272       /* flushing */
1273       return GST_FLOW_WRONG_STATE;
1274     }
1275
1276     if (GST_IS_EVENT (item->object)) {
1277       /* events have higher priority */
1278       current = other;
1279       break;
1280     }
1281     time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
1282     if (time < cur_time) {
1283       cur_time = time;
1284       current = other;
1285     }
1286   }
1287
1288   *stream = current;
1289   if (current == NULL)
1290     ret = GST_FLOW_UNEXPECTED;
1291   return ret;
1292 }
1293
1294 static void
1295 gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
1296 {
1297   GstMssDemuxStream *stream = NULL;
1298   GstFlowReturn ret;
1299   GstMiniObject *object = NULL;
1300   GstDataQueueItem *item = NULL;
1301
1302   GST_LOG_OBJECT (mssdemux, "Starting stream loop");
1303
1304   GST_OBJECT_LOCK (mssdemux);
1305   if (mssdemux->update_bitrates) {
1306     mssdemux->update_bitrates = FALSE;
1307     GST_OBJECT_UNLOCK (mssdemux);
1308
1309     GST_DEBUG_OBJECT (mssdemux,
1310         "Starting streams reconfiguration due to bitrate changes");
1311     gst_mss_demux_reconfigure (mssdemux);
1312     GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
1313   } else {
1314     GST_OBJECT_UNLOCK (mssdemux);
1315   }
1316
1317   ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
1318
1319   if (stream)
1320     GST_DEBUG_OBJECT (mssdemux,
1321         "Stream loop selected %p stream of pad %s. %d - %s", stream,
1322         GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
1323   else
1324     GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
1325         gst_flow_get_name (ret));
1326
1327   switch (ret) {
1328     case GST_FLOW_OK:
1329       break;
1330     case GST_FLOW_ERROR:
1331       goto error;
1332     case GST_FLOW_UNEXPECTED:
1333       goto eos;
1334     case GST_FLOW_WRONG_STATE:
1335       GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
1336       goto stop;
1337     default:
1338       g_assert_not_reached ();
1339   }
1340
1341   GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
1342       stream, GST_PAD_NAME (stream->pad));
1343   if (gst_data_queue_pop (stream->dataqueue, &item)) {
1344     if (item->object)
1345       object = gst_mini_object_ref (item->object);
1346     item->destroy (item);
1347   } else {
1348     GST_DEBUG_OBJECT (mssdemux,
1349         "Failed to get object from dataqueue on stream %p %s", stream,
1350         GST_PAD_NAME (stream->pad));
1351     goto stop;
1352   }
1353
1354   if (G_UNLIKELY (stream->pending_newsegment)) {
1355     gst_pad_push_event (stream->pad, stream->pending_newsegment);
1356     stream->pending_newsegment = NULL;
1357   }
1358
1359   if (G_LIKELY (GST_IS_BUFFER (object))) {
1360     if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
1361       GST_ERROR_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
1362           GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
1363           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1364           GST_TIME_ARGS (stream->next_timestamp));
1365       GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
1366     }
1367
1368     GST_DEBUG_OBJECT (mssdemux,
1369         "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
1370         " discont:%d on pad %s", object,
1371         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1372         GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
1373         GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
1374         GST_PAD_NAME (stream->pad));
1375
1376     stream->next_timestamp =
1377         GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
1378
1379     stream->have_data = TRUE;
1380     ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
1381   } else if (GST_IS_EVENT (object)) {
1382     if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
1383       stream->eos = TRUE;
1384     GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
1385         GST_PAD_NAME (stream->pad));
1386     gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
1387   } else {
1388     g_return_if_reached ();
1389   }
1390
1391   switch (ret) {
1392     case GST_FLOW_UNEXPECTED:
1393       goto eos;                 /* EOS ? */
1394     case GST_FLOW_ERROR:
1395       goto error;
1396     case GST_FLOW_NOT_LINKED:
1397       break;                    /* TODO what to do here? pause the task or just keep pushing? */
1398     case GST_FLOW_OK:
1399     default:
1400       break;
1401   }
1402
1403   GST_LOG_OBJECT (mssdemux, "Stream loop end");
1404   return;
1405
1406 eos:
1407   {
1408     GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
1409     gst_task_pause (mssdemux->stream_task);
1410     return;
1411   }
1412 error:
1413   {
1414     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1415     gst_task_pause (mssdemux->stream_task);
1416     return;
1417   }
1418 stop:
1419   {
1420     GST_DEBUG_OBJECT (mssdemux, "Pausing streaming task");
1421     gst_task_pause (mssdemux->stream_task);
1422     return;
1423   }
1424 }