22bfdc7aca7acf5f93c00f9176de10b3f6f2ec3c
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  * TODO
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include "gst/gst-i18n-plugin.h"
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "gstmssdemux.h"
42
43 GST_DEBUG_CATEGORY (mssdemux_debug);
44
45 static GstStaticPadTemplate gst_mss_demux_sink_template =
46 GST_STATIC_PAD_TEMPLATE ("sink",
47     GST_PAD_SINK,
48     GST_PAD_ALWAYS,
49     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
50     );
51
52 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
53 GST_STATIC_PAD_TEMPLATE ("video_%02u",
54     GST_PAD_SRC,
55     GST_PAD_SOMETIMES,
56     GST_STATIC_CAPS_ANY);
57
58 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
59 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
60     GST_PAD_SRC,
61     GST_PAD_SOMETIMES,
62     GST_STATIC_CAPS_ANY);
63
64 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
65
66 static void gst_mss_demux_dispose (GObject * object);
67 static GstStateChangeReturn
68 gst_mss_demux_change_state (GstElement * element, GstStateChange transition);
69 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
70 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
71
72 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
73
74 static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
75
76 static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
77
78 static void
79 gst_mss_demux_base_init (gpointer klass)
80 {
81   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
82
83   gst_element_class_add_static_pad_template (element_class,
84       &gst_mss_demux_sink_template);
85   gst_element_class_add_static_pad_template (element_class,
86       &gst_mss_demux_videosrc_template);
87   gst_element_class_add_static_pad_template (element_class,
88       &gst_mss_demux_audiosrc_template);
89   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
90       "demuxer", "Demuxer",
91       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
92       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
93
94   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
95 }
96
97 static void
98 gst_mss_demux_class_init (GstMssDemuxClass * klass)
99 {
100   GObjectClass *gobject_class;
101   GstElementClass *gstelement_class;
102
103   gobject_class = (GObjectClass *) klass;
104   gstelement_class = (GstElementClass *) klass;
105
106   parent_class = g_type_class_peek_parent (klass);
107
108   gobject_class->dispose = gst_mss_demux_dispose;
109
110   gstelement_class->change_state =
111       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
112 }
113
114 static void
115 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
116 {
117   mssdemux->sinkpad =
118       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
119   gst_pad_set_chain_function (mssdemux->sinkpad,
120       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
121   gst_pad_set_event_function (mssdemux->sinkpad,
122       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
123   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
124 }
125
126 static GstMssDemuxStream *
127 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
128     GstMssStream * manifeststream, GstPad * srcpad)
129 {
130   GstMssDemuxStream *stream;
131
132   stream = g_new0 (GstMssDemuxStream, 1);
133   stream->downloader = gst_uri_downloader_new ();
134
135   /* Streaming task */
136   g_static_rec_mutex_init (&stream->stream_lock);
137   stream->stream_task =
138       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
139   gst_task_set_lock (stream->stream_task, &stream->stream_lock);
140
141   stream->pad = srcpad;
142   stream->manifest_stream = manifeststream;
143   stream->parent = mssdemux;
144
145   return stream;
146 }
147
148 static void
149 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
150 {
151   if (stream->stream_task) {
152     if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
153       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
154           GST_DEBUG_PAD_NAME (stream->pad));
155       gst_task_stop (stream->stream_task);
156       g_static_rec_mutex_lock (&stream->stream_lock);
157       g_static_rec_mutex_unlock (&stream->stream_lock);
158       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
159       gst_task_join (stream->stream_task);
160       GST_LOG_OBJECT (stream->parent, "Finished");
161     }
162     gst_object_unref (stream->stream_task);
163     g_static_rec_mutex_free (&stream->stream_lock);
164     stream->stream_task = NULL;
165   }
166
167   if (stream->pending_newsegment) {
168     gst_event_unref (stream->pending_newsegment);
169     stream->pending_newsegment = NULL;
170   }
171
172
173   if (stream->downloader != NULL) {
174     g_object_unref (stream->downloader);
175     stream->downloader = NULL;
176   }
177   if (stream->pad) {
178     gst_object_unref (stream->pad);
179     stream->pad = NULL;
180   }
181   g_free (stream);
182 }
183
184 static void
185 gst_mss_demux_reset (GstMssDemux * mssdemux)
186 {
187   GSList *iter;
188   if (mssdemux->manifest_buffer) {
189     gst_buffer_unref (mssdemux->manifest_buffer);
190     mssdemux->manifest_buffer = NULL;
191   }
192
193   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
194     GstMssDemuxStream *stream = iter->data;
195     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
196     gst_mss_demux_stream_free (stream);
197   }
198   g_slist_free (mssdemux->streams);
199   mssdemux->streams = NULL;
200
201   if (mssdemux->manifest) {
202     gst_mss_manifest_free (mssdemux->manifest);
203     mssdemux->manifest = NULL;
204   }
205
206   mssdemux->n_videos = mssdemux->n_audios = 0;
207   g_free (mssdemux->base_url);
208   mssdemux->base_url = NULL;
209 }
210
211 static void
212 gst_mss_demux_dispose (GObject * object)
213 {
214   /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
215
216   G_OBJECT_CLASS (parent_class)->dispose (object);
217 }
218
219 static GstStateChangeReturn
220 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
221 {
222   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
223   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
224
225   switch (transition) {
226     case GST_STATE_CHANGE_PAUSED_TO_READY:
227       gst_mss_demux_reset (mssdemux);
228       break;
229     case GST_STATE_CHANGE_READY_TO_NULL:
230       break;
231     default:
232       break;
233   }
234
235   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
236
237   switch (transition) {
238     case GST_STATE_CHANGE_PAUSED_TO_READY:{
239       break;
240     }
241     default:
242       break;
243   }
244
245   return result;
246 }
247
248 static GstFlowReturn
249 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
250 {
251   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
252   if (mssdemux->manifest_buffer == NULL)
253     mssdemux->manifest_buffer = buffer;
254   else
255     mssdemux->manifest_buffer =
256         gst_buffer_join (mssdemux->manifest_buffer, buffer);
257
258   return GST_FLOW_OK;
259 }
260
261 static void
262 gst_mss_demux_start (GstMssDemux * mssdemux)
263 {
264   GSList *iter;
265
266   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
267   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
268     GstMssDemuxStream *stream = iter->data;
269     gst_task_start (stream->stream_task);
270   }
271 }
272
273 static gboolean
274 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
275 {
276   GSList *iter;
277   gboolean ret = TRUE;
278
279   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
280     GstMssDemuxStream *stream = iter->data;
281     gst_event_ref (event);
282     ret = ret & gst_pad_push_event (stream->pad, event);
283   }
284   return ret;
285 }
286
287 static gboolean
288 gst_mss_demux_event (GstPad * pad, GstEvent * event)
289 {
290   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
291   gboolean forward = TRUE;
292   gboolean ret = TRUE;
293
294   switch (GST_EVENT_TYPE (event)) {
295     case GST_EVENT_EOS:
296       if (mssdemux->manifest_buffer == NULL) {
297         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
298         break;
299       }
300
301       gst_mss_demux_process_manifest (mssdemux);
302       gst_mss_demux_start (mssdemux);
303       forward = FALSE;
304       break;
305     default:
306       break;
307   }
308
309   if (forward) {
310     ret = gst_pad_event_default (pad, event);
311   } else {
312     gst_event_unref (event);
313   }
314
315   return ret;
316 }
317
318 static gboolean
319 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
320 {
321   GstMssDemux *mssdemux;
322
323   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
324
325   switch (event->type) {
326     case GST_EVENT_SEEK:
327     {
328       gdouble rate;
329       GstFormat format;
330       GstSeekFlags flags;
331       GstSeekType start_type, stop_type;
332       gint64 start, stop;
333       GstEvent *newsegment;
334       GSList *iter;
335
336       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
337
338       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
339           &stop_type, &stop);
340
341       if (format != GST_FORMAT_TIME)
342         return FALSE;
343
344       GST_DEBUG_OBJECT (mssdemux,
345           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
346           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
347
348       if (flags & GST_SEEK_FLAG_FLUSH) {
349         GstEvent *flush = gst_event_new_flush_start ();
350         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
351
352         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
353         gst_mss_demux_push_src_event (mssdemux, flush);
354         gst_event_unref (flush);
355       }
356
357       /* stop the tasks */
358       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
359         GstMssDemuxStream *stream = iter->data;
360
361         gst_uri_downloader_cancel (stream->downloader);
362         gst_task_pause (stream->stream_task);
363       }
364       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
365         GstMssDemuxStream *stream = iter->data;
366         g_static_rec_mutex_lock (&stream->stream_lock);
367       }
368
369       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
370         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
371         return FALSE;
372       }
373
374       newsegment =
375           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
376       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
377       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
378         GstMssDemuxStream *stream = iter->data;
379
380         stream->pending_newsegment = gst_event_ref (newsegment);
381       }
382       gst_event_unref (newsegment);
383
384       if (flags & GST_SEEK_FLAG_FLUSH) {
385         GstEvent *flush = gst_event_new_flush_stop ();
386         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
387
388         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
389         gst_mss_demux_push_src_event (mssdemux, flush);
390         gst_event_unref (flush);
391       }
392
393       /* restart tasks */
394       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
395         GstMssDemuxStream *stream = iter->data;
396         g_static_rec_mutex_unlock (&stream->stream_lock);
397       }
398       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
399         GstMssDemuxStream *stream = iter->data;
400
401         gst_task_start (stream->stream_task);
402       }
403
404       return TRUE;
405     }
406     default:
407       break;
408   }
409
410   return gst_pad_event_default (pad, event);
411 }
412
413 static gboolean
414 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
415 {
416   GstMssDemux *mssdemux;
417   gboolean ret = FALSE;
418
419   if (query == NULL)
420     return FALSE;
421
422   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
423
424   switch (query->type) {
425     case GST_QUERY_DURATION:{
426       GstClockTime duration = -1;
427       GstFormat fmt;
428
429       gst_query_parse_duration (query, &fmt, NULL);
430       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
431         /* TODO should we use the streams accumulated duration or the main manifest duration? */
432         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
433
434         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
435           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
436           ret = TRUE;
437         }
438       }
439       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
440           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
441       break;
442     }
443     case GST_QUERY_LATENCY:
444       gst_query_set_latency (query, FALSE, 0, -1);
445       ret = TRUE;
446       break;
447     case GST_QUERY_SEEKING:{
448       GstFormat fmt;
449       gint64 stop = -1;
450
451       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
452       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
453           fmt);
454       if (fmt == GST_FORMAT_TIME) {
455         GstClockTime duration;
456         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
457         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
458           stop = duration;
459         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
460         ret = TRUE;
461         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
462             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
463       }
464       break;
465     }
466     default:
467       /* Don't fordward queries upstream because of the special nature of this
468        *  "demuxer", which relies on the upstream element only to be fed
469        *  the Manifest
470        */
471       break;
472   }
473
474   return ret;
475 }
476
477 static void
478 _set_src_pad_functions (GstPad * pad)
479 {
480   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
481   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
482 }
483
484 static void
485 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
486 {
487   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
488   GSList *iter;
489
490   if (streams == NULL) {
491     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
492     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
493         (_("This file contains no playable streams.")),
494         ("no streams found at the Manifest"));
495     return;
496   }
497
498   for (iter = streams; iter; iter = g_slist_next (iter)) {
499     gchar *name;
500     GstPad *srcpad = NULL;
501     GstMssDemuxStream *stream = NULL;
502     GstMssStream *manifeststream = iter->data;
503     GstMssStreamType streamtype;
504
505     streamtype = gst_mss_stream_get_type (manifeststream);
506     GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
507         gst_mss_stream_type_name (streamtype));
508
509     /* TODO use stream's name as the pad name? */
510     if (streamtype == MSS_STREAM_TYPE_VIDEO) {
511       name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
512       srcpad =
513           gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
514           name);
515       g_free (name);
516     } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
517       name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
518       srcpad =
519           gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
520           name);
521       g_free (name);
522     }
523
524     if (!srcpad) {
525       GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
526       continue;
527     }
528
529     _set_src_pad_functions (srcpad);
530
531     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
532     gst_mss_stream_set_active (manifeststream, TRUE);
533     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
534   }
535 }
536
537 static gboolean
538 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
539 {
540   GstCaps *caps;
541   GstCaps *media_caps;
542   GstPad *pad = stream->pad;
543
544   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
545
546   if (media_caps) {
547     caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
548         "mss-fragmented", "timescale", G_TYPE_UINT64,
549         gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
550         GST_TYPE_CAPS, media_caps, NULL);
551     gst_caps_unref (media_caps);
552     gst_pad_set_caps (pad, caps);
553     gst_caps_unref (caps);
554
555     gst_pad_set_active (pad, TRUE);
556     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
557         GST_DEBUG_PAD_NAME (pad), caps);
558     gst_object_ref (pad);
559     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
560   } else {
561     GST_WARNING_OBJECT (mssdemux,
562         "Couldn't get caps from manifest stream %p %s, not exposing it", stream,
563         GST_PAD_NAME (stream->pad));
564     return FALSE;
565   }
566   return TRUE;
567 }
568
569 static void
570 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
571 {
572   GstQuery *query;
573   gchar *uri = NULL;
574   gboolean ret;
575   GSList *iter;
576
577   g_return_if_fail (mssdemux->manifest_buffer != NULL);
578   g_return_if_fail (mssdemux->manifest == NULL);
579
580   query = gst_query_new_uri ();
581   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
582   if (ret) {
583     gchar *baseurl_end;
584     gst_query_parse_uri (query, &uri);
585     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
586
587     baseurl_end = g_strrstr (uri, "/Manifest");
588     if (baseurl_end) {
589       /* set the new end of the string */
590       baseurl_end[0] = '\0';
591     } else {
592       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
593     }
594
595     mssdemux->base_url = uri;
596   }
597   gst_query_unref (query);
598
599   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
600   if (!mssdemux->manifest) {
601     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
602         ("Xml manifest file couldn't be parsed"));
603     return;
604   }
605
606   gst_mss_demux_create_streams (mssdemux);
607   for (iter = mssdemux->streams; iter;) {
608     GSList *current = iter;
609     GstMssDemuxStream *stream = iter->data;
610     iter = g_slist_next (iter); /* do it ourselves as we want it done in the beginning of the loop */
611     if (!gst_mss_demux_expose_stream (mssdemux, stream)) {
612       gst_mss_demux_stream_free (stream);
613       mssdemux->streams = g_slist_delete_link (mssdemux->streams, current);
614     }
615   }
616
617   if (!mssdemux->streams) {
618     /* no streams */
619     GST_WARNING_OBJECT (mssdemux, "Couldn't identify the caps for any of the "
620         "streams found in the manifest");
621     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
622         (_("This file contains no playable streams.")),
623         ("No known stream formats found at the Manifest"));
624     return;
625   }
626
627   gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
628 }
629
630 static void
631 gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
632 {
633   GstMssDemux *mssdemux = stream->parent;
634   gchar *path;
635   gchar *url;
636   GstFragment *fragment;
637   GstBuffer *buffer;
638   GstFlowReturn ret;
639
640   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
641   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
642   switch (ret) {
643     case GST_FLOW_OK:
644       break;                    /* all is good, let's go */
645     case GST_FLOW_UNEXPECTED:  /* EOS */
646       goto eos;
647     case GST_FLOW_ERROR:
648       goto error;
649     default:
650       break;
651   }
652   if (!path) {
653     goto no_url_error;
654   }
655   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
656
657   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
658
659   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
660   g_free (path);
661   g_free (url);
662
663   if (!fragment) {
664     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
665     /* TODO check if we are truly stoping */
666     return;
667   }
668
669   buffer = gst_fragment_get_buffer (fragment);
670   buffer = gst_buffer_make_metadata_writable (buffer);
671   gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad));
672   GST_BUFFER_TIMESTAMP (buffer) =
673       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
674   GST_BUFFER_DURATION (buffer) =
675       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
676
677   if (G_UNLIKELY (stream->pending_newsegment)) {
678     gst_pad_push_event (stream->pad, stream->pending_newsegment);
679     stream->pending_newsegment = NULL;
680   }
681
682   GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
683       GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
684   ret = gst_pad_push (stream->pad, buffer);
685   switch (ret) {
686     case GST_FLOW_UNEXPECTED:
687       goto eos;                 /* EOS ? */
688     case GST_FLOW_ERROR:
689       goto error;
690     case GST_FLOW_NOT_LINKED:
691       break;                    /* TODO what to do here? pause the task or just keep pushing? */
692     case GST_FLOW_OK:
693     default:
694       break;
695   }
696
697   gst_mss_stream_advance_fragment (stream->manifest_stream);
698   return;
699
700 eos:
701   {
702     GstEvent *eos = gst_event_new_eos ();
703     GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
704         GST_DEBUG_PAD_NAME (stream->pad));
705     gst_pad_push_event (stream->pad, eos);
706     gst_task_stop (stream->stream_task);
707     return;
708   }
709 no_url_error:
710   {
711     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
712         (_("Failed to get fragment URL.")),
713         ("An error happened when getting fragment URL"));
714     gst_task_stop (stream->stream_task);
715     return;
716   }
717 error:
718   {
719     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
720     gst_task_stop (stream->stream_task);
721     return;
722   }
723 }