ac4a2a4210ec1c41c8b03226ec08a624059185c0
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  * TODO
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include "gst/gst-i18n-plugin.h"
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "gstmssdemux.h"
42
43 GST_DEBUG_CATEGORY (mssdemux_debug);
44
45 static GstStaticPadTemplate gst_mss_demux_sink_template =
46 GST_STATIC_PAD_TEMPLATE ("sink",
47     GST_PAD_SINK,
48     GST_PAD_ALWAYS,
49     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
50     );
51
52 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
53 GST_STATIC_PAD_TEMPLATE ("video_%02u",
54     GST_PAD_SRC,
55     GST_PAD_SOMETIMES,
56     GST_STATIC_CAPS_ANY);
57
58 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
59 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
60     GST_PAD_SRC,
61     GST_PAD_SOMETIMES,
62     GST_STATIC_CAPS_ANY);
63
64 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
65
66 static void gst_mss_demux_dispose (GObject * object);
67 static GstStateChangeReturn
68 gst_mss_demux_change_state (GstElement * element, GstStateChange transition);
69 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
70 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
71
72 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
73
74 static void gst_mss_demux_stream_loop (GstMssDemuxStream * stream);
75
76 static void gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
77
78 static void
79 gst_mss_demux_base_init (gpointer klass)
80 {
81   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
82
83   gst_element_class_add_static_pad_template (element_class,
84       &gst_mss_demux_sink_template);
85   gst_element_class_add_static_pad_template (element_class,
86       &gst_mss_demux_videosrc_template);
87   gst_element_class_add_static_pad_template (element_class,
88       &gst_mss_demux_audiosrc_template);
89   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
90       "demuxer", "Demuxer",
91       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
92       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
93
94   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
95 }
96
97 static void
98 gst_mss_demux_class_init (GstMssDemuxClass * klass)
99 {
100   GObjectClass *gobject_class;
101   GstElementClass *gstelement_class;
102
103   gobject_class = (GObjectClass *) klass;
104   gstelement_class = (GstElementClass *) klass;
105
106   parent_class = g_type_class_peek_parent (klass);
107
108   gobject_class->dispose = gst_mss_demux_dispose;
109
110   gstelement_class->change_state =
111       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
112 }
113
114 static void
115 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
116 {
117   mssdemux->sinkpad =
118       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
119   gst_pad_set_chain_function (mssdemux->sinkpad,
120       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
121   gst_pad_set_event_function (mssdemux->sinkpad,
122       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
123   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
124 }
125
126 static GstMssDemuxStream *
127 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
128     GstMssStream * manifeststream, GstPad * srcpad)
129 {
130   GstMssDemuxStream *stream;
131
132   stream = g_new0 (GstMssDemuxStream, 1);
133   stream->downloader = gst_uri_downloader_new ();
134
135   /* Streaming task */
136   g_static_rec_mutex_init (&stream->stream_lock);
137   stream->stream_task =
138       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, stream);
139   gst_task_set_lock (stream->stream_task, &stream->stream_lock);
140
141   stream->pad = srcpad;
142   stream->manifest_stream = manifeststream;
143   stream->parent = mssdemux;
144
145   return stream;
146 }
147
148 static void
149 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
150 {
151   if (stream->stream_task) {
152     if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED) {
153       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
154           GST_DEBUG_PAD_NAME (stream->pad));
155       gst_task_stop (stream->stream_task);
156       g_static_rec_mutex_lock (&stream->stream_lock);
157       g_static_rec_mutex_unlock (&stream->stream_lock);
158       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
159       gst_task_join (stream->stream_task);
160       GST_LOG_OBJECT (stream->parent, "Finished");
161     }
162     gst_object_unref (stream->stream_task);
163     g_static_rec_mutex_free (&stream->stream_lock);
164     stream->stream_task = NULL;
165   }
166
167   if (stream->pending_newsegment) {
168     gst_event_unref (stream->pending_newsegment);
169     stream->pending_newsegment = NULL;
170   }
171
172
173   if (stream->downloader != NULL) {
174     g_object_unref (stream->downloader);
175     stream->downloader = NULL;
176   }
177   if (stream->pad) {
178     gst_object_unref (stream->pad);
179     stream->pad = NULL;
180   }
181   g_free (stream);
182 }
183
184 static void
185 gst_mss_demux_reset (GstMssDemux * mssdemux)
186 {
187   GSList *iter;
188   if (mssdemux->manifest_buffer) {
189     gst_buffer_unref (mssdemux->manifest_buffer);
190     mssdemux->manifest_buffer = NULL;
191   }
192
193   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
194     GstMssDemuxStream *stream = iter->data;
195     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
196     gst_mss_demux_stream_free (stream);
197   }
198   g_slist_free (mssdemux->streams);
199   mssdemux->streams = NULL;
200
201   if (mssdemux->manifest) {
202     gst_mss_manifest_free (mssdemux->manifest);
203     mssdemux->manifest = NULL;
204   }
205
206   mssdemux->n_videos = mssdemux->n_audios = 0;
207   g_free (mssdemux->base_url);
208   mssdemux->base_url = NULL;
209 }
210
211 static void
212 gst_mss_demux_dispose (GObject * object)
213 {
214   /* GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object); */
215
216   G_OBJECT_CLASS (parent_class)->dispose (object);
217 }
218
219 static GstStateChangeReturn
220 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
221 {
222   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
223   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
224
225   switch (transition) {
226     case GST_STATE_CHANGE_PAUSED_TO_READY:
227       gst_mss_demux_reset (mssdemux);
228       break;
229     case GST_STATE_CHANGE_READY_TO_NULL:
230       break;
231     default:
232       break;
233   }
234
235   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
236
237   switch (transition) {
238     case GST_STATE_CHANGE_PAUSED_TO_READY:{
239       break;
240     }
241     default:
242       break;
243   }
244
245   return result;
246 }
247
248 static GstFlowReturn
249 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
250 {
251   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
252   if (mssdemux->manifest_buffer == NULL)
253     mssdemux->manifest_buffer = buffer;
254   else
255     mssdemux->manifest_buffer =
256         gst_buffer_join (mssdemux->manifest_buffer, buffer);
257
258   return GST_FLOW_OK;
259 }
260
261 static void
262 gst_mss_demux_start (GstMssDemux * mssdemux)
263 {
264   GSList *iter;
265
266   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
267   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
268     GstMssDemuxStream *stream = iter->data;
269     gst_task_start (stream->stream_task);
270   }
271 }
272
273 static gboolean
274 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
275 {
276   GSList *iter;
277   gboolean ret = TRUE;
278
279   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
280     GstMssDemuxStream *stream = iter->data;
281     gst_event_ref (event);
282     ret = ret & gst_pad_push_event (stream->pad, event);
283   }
284   return ret;
285 }
286
287 static gboolean
288 gst_mss_demux_event (GstPad * pad, GstEvent * event)
289 {
290   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
291   gboolean forward = TRUE;
292   gboolean ret = TRUE;
293
294   switch (GST_EVENT_TYPE (event)) {
295     case GST_EVENT_EOS:
296       if (mssdemux->manifest_buffer == NULL) {
297         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
298         break;
299       }
300
301       gst_mss_demux_process_manifest (mssdemux);
302       gst_mss_demux_start (mssdemux);
303       forward = FALSE;
304       break;
305     default:
306       break;
307   }
308
309   if (forward) {
310     ret = gst_pad_event_default (pad, event);
311   } else {
312     gst_event_unref (event);
313   }
314
315   return ret;
316 }
317
318 static gboolean
319 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
320 {
321   GstMssDemux *mssdemux;
322
323   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
324
325   switch (event->type) {
326     case GST_EVENT_SEEK:
327     {
328       gdouble rate;
329       GstFormat format;
330       GstSeekFlags flags;
331       GstSeekType start_type, stop_type;
332       gint64 start, stop;
333       GstEvent *newsegment;
334       GSList *iter;
335
336       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
337
338       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
339           &stop_type, &stop);
340
341       if (format != GST_FORMAT_TIME)
342         return FALSE;
343
344       GST_DEBUG_OBJECT (mssdemux,
345           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
346           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
347
348       if (flags & GST_SEEK_FLAG_FLUSH) {
349         GstEvent *flush = gst_event_new_flush_start ();
350         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
351
352         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
353         gst_mss_demux_push_src_event (mssdemux, flush);
354         gst_event_unref (flush);
355       }
356
357       /* stop the tasks */
358       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
359         GstMssDemuxStream *stream = iter->data;
360
361         gst_uri_downloader_cancel (stream->downloader);
362         gst_task_pause (stream->stream_task);
363       }
364       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
365         GstMssDemuxStream *stream = iter->data;
366         g_static_rec_mutex_lock (&stream->stream_lock);
367       }
368
369       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
370         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
371         return FALSE;
372       }
373
374       newsegment =
375           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
376       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
377       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
378         GstMssDemuxStream *stream = iter->data;
379
380         stream->pending_newsegment = gst_event_ref (newsegment);
381       }
382       gst_event_unref (newsegment);
383
384       if (flags & GST_SEEK_FLAG_FLUSH) {
385         GstEvent *flush = gst_event_new_flush_stop ();
386         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
387
388         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
389         gst_mss_demux_push_src_event (mssdemux, flush);
390         gst_event_unref (flush);
391       }
392
393       /* restart tasks */
394       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
395         GstMssDemuxStream *stream = iter->data;
396         g_static_rec_mutex_unlock (&stream->stream_lock);
397       }
398       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
399         GstMssDemuxStream *stream = iter->data;
400
401         gst_task_start (stream->stream_task);
402       }
403
404       return TRUE;
405     }
406     default:
407       break;
408   }
409
410   return gst_pad_event_default (pad, event);
411 }
412
413 static gboolean
414 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
415 {
416   GstMssDemux *mssdemux;
417   gboolean ret = FALSE;
418
419   if (query == NULL)
420     return FALSE;
421
422   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
423
424   switch (query->type) {
425     case GST_QUERY_DURATION:{
426       GstClockTime duration = -1;
427       GstFormat fmt;
428
429       gst_query_parse_duration (query, &fmt, NULL);
430       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
431         /* TODO should we use the streams accumulated duration or the main manifest duration? */
432         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
433
434         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
435           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
436           ret = TRUE;
437         }
438       }
439       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
440           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
441       break;
442     }
443     case GST_QUERY_LATENCY:
444       gst_query_set_latency (query, FALSE, 0, -1);
445       ret = TRUE;
446       break;
447     case GST_QUERY_SEEKING:{
448       GstFormat fmt;
449       gint64 stop = -1;
450
451       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
452       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
453           fmt);
454       if (fmt == GST_FORMAT_TIME) {
455         GstClockTime duration;
456         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
457         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
458           stop = duration;
459         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
460         ret = TRUE;
461         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
462             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
463       }
464       break;
465     }
466     default:
467       /* Don't fordward queries upstream because of the special nature of this
468        *  "demuxer", which relies on the upstream element only to be fed
469        *  the Manifest
470        */
471       break;
472   }
473
474   return ret;
475 }
476
477 static void
478 _set_src_pad_functions (GstPad * pad)
479 {
480   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
481   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
482 }
483
484 static void
485 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
486 {
487   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
488   GSList *iter;
489
490   if (streams == NULL) {
491     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
492     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
493         (_("This file contains no playable streams.")),
494         ("no streams found at the Manifest"));
495     return;
496   }
497
498   for (iter = streams; iter; iter = g_slist_next (iter)) {
499     gchar *name;
500     GstPad *srcpad = NULL;
501     GstMssDemuxStream *stream = NULL;
502     GstMssStream *manifeststream = iter->data;
503     GstMssStreamType streamtype;
504
505     streamtype = gst_mss_stream_get_type (manifeststream);
506     GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
507         gst_mss_stream_type_name (streamtype));
508
509     /* TODO use stream's name as the pad name? */
510     if (streamtype == MSS_STREAM_TYPE_VIDEO) {
511       name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
512       srcpad =
513           gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
514           name);
515       g_free (name);
516     } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
517       name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
518       srcpad =
519           gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
520           name);
521       g_free (name);
522     }
523
524     if (!srcpad) {
525       GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
526       continue;
527     }
528
529     _set_src_pad_functions (srcpad);
530
531     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
532     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
533   }
534 }
535
536 static void
537 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
538 {
539   GstCaps *caps;
540   GstCaps *media_caps;
541   GstPad *pad = stream->pad;
542
543   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
544   caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
545       "mss-fragmented", "timescale", G_TYPE_UINT64,
546       gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
547       GST_TYPE_CAPS, media_caps, NULL);
548   gst_caps_unref (media_caps);
549
550   if (caps) {
551     gst_pad_set_caps (pad, caps);
552     gst_caps_unref (caps);
553
554     gst_pad_set_active (pad, TRUE);
555     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
556         GST_DEBUG_PAD_NAME (pad), caps);
557     gst_object_ref (pad);
558     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
559   } else {
560     GST_WARNING_OBJECT (mssdemux, "Not exposing stream of unrecognized format");
561   }
562 }
563
564 static void
565 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
566 {
567   GstQuery *query;
568   gchar *uri = NULL;
569   gboolean ret;
570   GSList *iter;
571
572   g_return_if_fail (mssdemux->manifest_buffer != NULL);
573   g_return_if_fail (mssdemux->manifest == NULL);
574
575   query = gst_query_new_uri ();
576   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
577   if (ret) {
578     gchar *baseurl_end;
579     gst_query_parse_uri (query, &uri);
580     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
581
582     baseurl_end = g_strrstr (uri, "/Manifest");
583     if (baseurl_end) {
584       /* set the new end of the string */
585       baseurl_end[0] = '\0';
586     } else {
587       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
588     }
589
590     mssdemux->base_url = uri;
591   }
592   gst_query_unref (query);
593
594   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
595   if (!mssdemux->manifest) {
596     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
597         ("Xml manifest file couldn't be parsed"));
598     return;
599   }
600
601   gst_mss_demux_create_streams (mssdemux);
602   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
603     gst_mss_demux_expose_stream (mssdemux, iter->data);
604   }
605 }
606
607 static void
608 gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
609 {
610   GstMssDemux *mssdemux = stream->parent;
611   gchar *path;
612   gchar *url;
613   GstFragment *fragment;
614   GstBuffer *buffer;
615   GstFlowReturn ret;
616
617   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
618   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
619   switch (ret) {
620     case GST_FLOW_OK:
621       break;                    /* all is good, let's go */
622     case GST_FLOW_UNEXPECTED:  /* EOS */
623       goto eos;
624     case GST_FLOW_ERROR:
625       goto error;
626     default:
627       break;
628   }
629   if (!path) {
630     goto no_url_error;
631   }
632   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
633
634   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
635
636   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
637   g_free (path);
638   g_free (url);
639
640   if (!fragment) {
641     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
642     /* TODO check if we are truly stoping */
643     return;
644   }
645
646   buffer = gst_fragment_get_buffer (fragment);
647   buffer = gst_buffer_make_metadata_writable (buffer);
648   gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad));
649   GST_BUFFER_TIMESTAMP (buffer) =
650       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
651   GST_BUFFER_DURATION (buffer) =
652       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
653
654   if (G_UNLIKELY (stream->pending_newsegment)) {
655     gst_pad_push_event (stream->pad, stream->pending_newsegment);
656     stream->pending_newsegment = NULL;
657   }
658
659   GST_DEBUG_OBJECT (mssdemux, "Pushing buffer of size %u on pad %s",
660       GST_BUFFER_SIZE (buffer), GST_PAD_NAME (stream->pad));
661   ret = gst_pad_push (stream->pad, buffer);
662   switch (ret) {
663     case GST_FLOW_UNEXPECTED:
664       goto eos;                 /* EOS ? */
665     case GST_FLOW_ERROR:
666       goto error;
667     case GST_FLOW_NOT_LINKED:
668       break;                    /* TODO what to do here? pause the task or just keep pushing? */
669     case GST_FLOW_OK:
670     default:
671       break;
672   }
673
674   gst_mss_stream_advance_fragment (stream->manifest_stream);
675   return;
676
677 eos:
678   {
679     GstEvent *eos = gst_event_new_eos ();
680     GST_DEBUG_OBJECT (mssdemux, "Pushing EOS on pad %s:%s",
681         GST_DEBUG_PAD_NAME (stream->pad));
682     gst_pad_push_event (stream->pad, eos);
683     gst_task_stop (stream->stream_task);
684     return;
685   }
686 no_url_error:
687   {
688     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
689         (_("Failed to get fragment URL.")),
690         ("An error happened when getting fragment URL"));
691     gst_task_stop (stream->stream_task);
692     return;
693   }
694 error:
695   {
696     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
697     gst_task_stop (stream->stream_task);
698     return;
699   }
700 }