121b52452c905e2f8147d792ded71025905ee468
[platform/upstream/gstreamer.git] / ext / smoothstreaming / gstmssdemux.c
1 /* GStreamer
2  * Copyright (C) 2012 Smart TV Alliance
3  *  Author: Thiago Sousa Santos <thiago.sousa.santos@collabora.com>, Collabora Ltd.
4  *
5  * gstmssdemux.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:element-mssdemux
25  *
26  * Demuxes a Microsoft's Smooth Streaming manifest into its audio and/or video streams.
27  *
28  * TODO
29  */
30
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include "gst/gst-i18n-plugin.h"
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40
41 #include "gstmssdemux.h"
42
43 GST_DEBUG_CATEGORY (mssdemux_debug);
44
45 #define DEFAULT_CONNECTION_SPEED 0
46 #define DEFAULT_MAX_QUEUE_SIZE_BUFFERS 0
47
48 enum
49 {
50   PROP_0,
51
52   PROP_CONNECTION_SPEED,
53   PROP_MAX_QUEUE_SIZE_BUFFERS,
54   PROP_LAST
55 };
56
57 static GstStaticPadTemplate gst_mss_demux_sink_template =
58 GST_STATIC_PAD_TEMPLATE ("sink",
59     GST_PAD_SINK,
60     GST_PAD_ALWAYS,
61     GST_STATIC_CAPS ("application/vnd.ms-sstr+xml")
62     );
63
64 static GstStaticPadTemplate gst_mss_demux_videosrc_template =
65 GST_STATIC_PAD_TEMPLATE ("video_%02u",
66     GST_PAD_SRC,
67     GST_PAD_SOMETIMES,
68     GST_STATIC_CAPS_ANY);
69
70 static GstStaticPadTemplate gst_mss_demux_audiosrc_template =
71 GST_STATIC_PAD_TEMPLATE ("audio_%02u",
72     GST_PAD_SRC,
73     GST_PAD_SOMETIMES,
74     GST_STATIC_CAPS_ANY);
75
76 GST_BOILERPLATE (GstMssDemux, gst_mss_demux, GstMssDemux, GST_TYPE_ELEMENT);
77
78 static void gst_mss_demux_dispose (GObject * object);
79 static void gst_mss_demux_set_property (GObject * object, guint prop_id,
80     const GValue * value, GParamSpec * pspec);
81 static void gst_mss_demux_get_property (GObject * object, guint prop_id,
82     GValue * value, GParamSpec * pspec);
83 static GstStateChangeReturn gst_mss_demux_change_state (GstElement * element,
84     GstStateChange transition);
85 static GstFlowReturn gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer);
86 static GstFlowReturn gst_mss_demux_event (GstPad * pad, GstEvent * event);
87
88 static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query);
89
90 static void gst_mss_demux_download_loop (GstMssDemuxStream * stream);
91 static void gst_mss_demux_stream_loop (GstMssDemux * mssdemux);
92
93 static gboolean gst_mss_demux_process_manifest (GstMssDemux * mssdemux);
94
95 static void
96 gst_mss_demux_base_init (gpointer klass)
97 {
98   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
99
100   gst_element_class_add_static_pad_template (element_class,
101       &gst_mss_demux_sink_template);
102   gst_element_class_add_static_pad_template (element_class,
103       &gst_mss_demux_videosrc_template);
104   gst_element_class_add_static_pad_template (element_class,
105       &gst_mss_demux_audiosrc_template);
106   gst_element_class_set_details_simple (element_class, "Smooth Streaming "
107       "demuxer", "Demuxer",
108       "Parse and demultiplex a Smooth Streaming manifest into audio and video "
109       "streams", "Thiago Santos <thiago.sousa.santos@collabora.com>");
110
111   GST_DEBUG_CATEGORY_INIT (mssdemux_debug, "mssdemux", 0, "mssdemux plugin");
112 }
113
114 static void
115 gst_mss_demux_class_init (GstMssDemuxClass * klass)
116 {
117   GObjectClass *gobject_class;
118   GstElementClass *gstelement_class;
119
120   gobject_class = (GObjectClass *) klass;
121   gstelement_class = (GstElementClass *) klass;
122
123   parent_class = g_type_class_peek_parent (klass);
124
125   gobject_class->dispose = gst_mss_demux_dispose;
126   gobject_class->set_property = gst_mss_demux_set_property;
127   gobject_class->get_property = gst_mss_demux_get_property;
128
129   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
130       g_param_spec_uint ("connection-speed", "Connection Speed",
131           "Network connection speed in kbps (0 = unknown)",
132           0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
133           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134
135   g_object_class_install_property (gobject_class, PROP_MAX_QUEUE_SIZE_BUFFERS,
136       g_param_spec_uint ("max-queue-size-buffers", "Max queue size in buffers",
137           "Maximum buffers that can be stored in each internal stream queue "
138           "(0 = infinite)", 0, G_MAXUINT, DEFAULT_MAX_QUEUE_SIZE_BUFFERS,
139           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140
141   gstelement_class->change_state =
142       GST_DEBUG_FUNCPTR (gst_mss_demux_change_state);
143 }
144
145 static void
146 gst_mss_demux_init (GstMssDemux * mssdemux, GstMssDemuxClass * klass)
147 {
148   mssdemux->sinkpad =
149       gst_pad_new_from_static_template (&gst_mss_demux_sink_template, "sink");
150   gst_pad_set_chain_function (mssdemux->sinkpad,
151       GST_DEBUG_FUNCPTR (gst_mss_demux_chain));
152   gst_pad_set_event_function (mssdemux->sinkpad,
153       GST_DEBUG_FUNCPTR (gst_mss_demux_event));
154   gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), mssdemux->sinkpad);
155
156   g_static_rec_mutex_init (&mssdemux->stream_lock);
157   mssdemux->stream_task =
158       gst_task_create ((GstTaskFunction) gst_mss_demux_stream_loop, mssdemux);
159   gst_task_set_lock (mssdemux->stream_task, &mssdemux->stream_lock);
160
161   mssdemux->data_queue_max_size = DEFAULT_MAX_QUEUE_SIZE_BUFFERS;
162 }
163
164 static gboolean
165 _data_queue_check_full (GstDataQueue * queue, guint visible, guint bytes,
166     guint64 time, gpointer checkdata)
167 {
168   GstMssDemuxStream *stream = checkdata;
169   GstMssDemux *mssdemux = stream->parent;
170
171   if (mssdemux->data_queue_max_size == 0)
172     return FALSE;               /* never full */
173   return visible >= mssdemux->data_queue_max_size;
174 }
175
176 static GstMssDemuxStream *
177 gst_mss_demux_stream_new (GstMssDemux * mssdemux,
178     GstMssStream * manifeststream, GstPad * srcpad)
179 {
180   GstMssDemuxStream *stream;
181
182   stream = g_new0 (GstMssDemuxStream, 1);
183   stream->downloader = gst_uri_downloader_new ();
184   stream->dataqueue = gst_data_queue_new (_data_queue_check_full, stream);
185
186   /* Downloading task */
187   g_static_rec_mutex_init (&stream->download_lock);
188   stream->download_task =
189       gst_task_create ((GstTaskFunction) gst_mss_demux_download_loop, stream);
190   gst_task_set_lock (stream->download_task, &stream->download_lock);
191
192   stream->pad = srcpad;
193   stream->manifest_stream = manifeststream;
194   stream->parent = mssdemux;
195
196   return stream;
197 }
198
199 static void
200 gst_mss_demux_stream_free (GstMssDemuxStream * stream)
201 {
202   if (stream->download_task) {
203     if (GST_TASK_STATE (stream->download_task) != GST_TASK_STOPPED) {
204       GST_DEBUG_OBJECT (stream->parent, "Leaving streaming task %s:%s",
205           GST_DEBUG_PAD_NAME (stream->pad));
206       gst_task_stop (stream->download_task);
207       gst_uri_downloader_cancel (stream->downloader);
208       g_static_rec_mutex_lock (&stream->download_lock);
209       g_static_rec_mutex_unlock (&stream->download_lock);
210       GST_LOG_OBJECT (stream->parent, "Waiting for task to finish");
211       gst_task_join (stream->download_task);
212       GST_LOG_OBJECT (stream->parent, "Finished");
213     }
214     gst_object_unref (stream->download_task);
215     g_static_rec_mutex_free (&stream->download_lock);
216     stream->download_task = NULL;
217   }
218
219   if (stream->pending_newsegment) {
220     gst_event_unref (stream->pending_newsegment);
221     stream->pending_newsegment = NULL;
222   }
223
224
225   if (stream->downloader != NULL) {
226     g_object_unref (stream->downloader);
227     stream->downloader = NULL;
228   }
229   if (stream->dataqueue) {
230     g_object_unref (stream->dataqueue);
231     stream->dataqueue = NULL;
232   }
233   if (stream->pad) {
234     gst_object_unref (stream->pad);
235     stream->pad = NULL;
236   }
237   g_free (stream);
238 }
239
240 static void
241 gst_mss_demux_reset (GstMssDemux * mssdemux)
242 {
243   GSList *iter;
244
245   if (GST_TASK_STATE (mssdemux->stream_task) != GST_TASK_STOPPED) {
246     gst_task_stop (mssdemux->stream_task);
247     g_static_rec_mutex_lock (&mssdemux->stream_lock);
248     g_static_rec_mutex_unlock (&mssdemux->stream_lock);
249     gst_task_join (mssdemux->stream_task);
250   }
251
252   if (mssdemux->manifest_buffer) {
253     gst_buffer_unref (mssdemux->manifest_buffer);
254     mssdemux->manifest_buffer = NULL;
255   }
256
257   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
258     GstMssDemuxStream *stream = iter->data;
259     gst_element_remove_pad (GST_ELEMENT_CAST (mssdemux), stream->pad);
260     gst_mss_demux_stream_free (stream);
261   }
262   g_slist_free (mssdemux->streams);
263   mssdemux->streams = NULL;
264
265   if (mssdemux->manifest) {
266     gst_mss_manifest_free (mssdemux->manifest);
267     mssdemux->manifest = NULL;
268   }
269
270   mssdemux->n_videos = mssdemux->n_audios = 0;
271   g_free (mssdemux->base_url);
272   g_free (mssdemux->manifest_uri);
273   mssdemux->base_url = NULL;
274 }
275
276 static void
277 gst_mss_demux_dispose (GObject * object)
278 {
279   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (object);
280
281   if (mssdemux->stream_task) {
282     gst_object_unref (mssdemux->stream_task);
283     g_static_rec_mutex_free (&mssdemux->stream_lock);
284     mssdemux->stream_task = NULL;
285   }
286
287   G_OBJECT_CLASS (parent_class)->dispose (object);
288 }
289
290 static void
291 gst_mss_demux_set_property (GObject * object, guint prop_id,
292     const GValue * value, GParamSpec * pspec)
293 {
294   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
295
296   switch (prop_id) {
297     case PROP_CONNECTION_SPEED:
298       GST_OBJECT_LOCK (mssdemux);
299       mssdemux->connection_speed = g_value_get_uint (value) * 1000;
300       mssdemux->update_bitrates = TRUE;
301       GST_DEBUG_OBJECT (mssdemux, "Connection speed set to %llu",
302           mssdemux->connection_speed);
303       GST_OBJECT_UNLOCK (mssdemux);
304       break;
305     case PROP_MAX_QUEUE_SIZE_BUFFERS:
306       mssdemux->data_queue_max_size = g_value_get_uint (value);
307       break;
308     default:
309       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
310       break;
311   }
312 }
313
314 static void
315 gst_mss_demux_get_property (GObject * object, guint prop_id, GValue * value,
316     GParamSpec * pspec)
317 {
318   GstMssDemux *mssdemux = GST_MSS_DEMUX (object);
319
320   switch (prop_id) {
321     case PROP_CONNECTION_SPEED:
322       g_value_set_uint (value, mssdemux->connection_speed / 1000);
323       break;
324     case PROP_MAX_QUEUE_SIZE_BUFFERS:
325       g_value_set_uint (value, mssdemux->data_queue_max_size);
326       break;
327     default:
328       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
329       break;
330   }
331 }
332
333 static GstStateChangeReturn
334 gst_mss_demux_change_state (GstElement * element, GstStateChange transition)
335 {
336   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (element);
337   GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;
338
339   switch (transition) {
340     case GST_STATE_CHANGE_PAUSED_TO_READY:
341       gst_mss_demux_reset (mssdemux);
342       break;
343     case GST_STATE_CHANGE_READY_TO_NULL:
344       break;
345     default:
346       break;
347   }
348
349   result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
350
351   switch (transition) {
352     case GST_STATE_CHANGE_PAUSED_TO_READY:{
353       break;
354     }
355     default:
356       break;
357   }
358
359   return result;
360 }
361
362 static GstFlowReturn
363 gst_mss_demux_chain (GstPad * pad, GstBuffer * buffer)
364 {
365   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
366   if (mssdemux->manifest_buffer == NULL)
367     mssdemux->manifest_buffer = buffer;
368   else
369     mssdemux->manifest_buffer =
370         gst_buffer_join (mssdemux->manifest_buffer, buffer);
371
372   return GST_FLOW_OK;
373 }
374
375 static void
376 gst_mss_demux_start (GstMssDemux * mssdemux)
377 {
378   GSList *iter;
379
380   GST_INFO_OBJECT (mssdemux, "Starting streams' tasks");
381   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
382     GstMssDemuxStream *stream = iter->data;
383     gst_task_start (stream->download_task);
384   }
385
386   gst_task_start (mssdemux->stream_task);
387 }
388
389 static gboolean
390 gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
391 {
392   GSList *iter;
393   gboolean ret = TRUE;
394
395   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
396     GstMssDemuxStream *stream = iter->data;
397     gst_event_ref (event);
398     ret = ret & gst_pad_push_event (stream->pad, event);
399   }
400   return ret;
401 }
402
403 static gboolean
404 gst_mss_demux_event (GstPad * pad, GstEvent * event)
405 {
406   GstMssDemux *mssdemux = GST_MSS_DEMUX_CAST (GST_PAD_PARENT (pad));
407   gboolean forward = TRUE;
408   gboolean ret = TRUE;
409
410   switch (GST_EVENT_TYPE (event)) {
411     case GST_EVENT_EOS:
412       if (mssdemux->manifest_buffer == NULL) {
413         GST_WARNING_OBJECT (mssdemux, "Received EOS without a manifest.");
414         break;
415       }
416
417       if (gst_mss_demux_process_manifest (mssdemux))
418         gst_mss_demux_start (mssdemux);
419       forward = FALSE;
420       break;
421     default:
422       break;
423   }
424
425   if (forward) {
426     ret = gst_pad_event_default (pad, event);
427   } else {
428     gst_event_unref (event);
429   }
430
431   return ret;
432 }
433
434 static void
435 gst_mss_demux_stop_tasks (GstMssDemux * mssdemux, gboolean immediate)
436 {
437   GSList *iter;
438
439   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
440     GstMssDemuxStream *stream = iter->data;
441
442     gst_data_queue_set_flushing (stream->dataqueue, TRUE);
443
444     if (immediate)
445       gst_uri_downloader_cancel (stream->downloader);
446     gst_task_pause (stream->download_task);
447   }
448   gst_task_pause (mssdemux->stream_task);
449
450   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
451     GstMssDemuxStream *stream = iter->data;
452     g_static_rec_mutex_lock (&stream->download_lock);
453   }
454   g_static_rec_mutex_lock (&mssdemux->stream_lock);
455 }
456
457 static void
458 gst_mss_demux_restart_tasks (GstMssDemux * mssdemux)
459 {
460   GSList *iter;
461   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
462     GstMssDemuxStream *stream = iter->data;
463     g_static_rec_mutex_unlock (&stream->download_lock);
464   }
465   g_static_rec_mutex_unlock (&mssdemux->stream_lock);
466   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
467     GstMssDemuxStream *stream = iter->data;
468
469     gst_data_queue_set_flushing (stream->dataqueue, FALSE);
470     gst_task_start (stream->download_task);
471   }
472   gst_task_start (mssdemux->stream_task);
473 }
474
475 static gboolean
476 gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
477 {
478   GstMssDemux *mssdemux;
479
480   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
481
482   switch (event->type) {
483     case GST_EVENT_SEEK:
484     {
485       gdouble rate;
486       GstFormat format;
487       GstSeekFlags flags;
488       GstSeekType start_type, stop_type;
489       gint64 start, stop;
490       GstEvent *newsegment;
491       GSList *iter;
492
493       GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
494
495       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
496           &stop_type, &stop);
497
498       if (format != GST_FORMAT_TIME)
499         return FALSE;
500
501       GST_DEBUG_OBJECT (mssdemux,
502           "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
503           GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
504
505       if (flags & GST_SEEK_FLAG_FLUSH) {
506         GstEvent *flush = gst_event_new_flush_start ();
507         GST_DEBUG_OBJECT (mssdemux, "sending flush start");
508
509         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
510         gst_mss_demux_push_src_event (mssdemux, flush);
511         gst_event_unref (flush);
512       }
513
514       gst_mss_demux_stop_tasks (mssdemux, TRUE);
515
516       if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
517         GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
518         return FALSE;
519       }
520
521       newsegment =
522           gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
523       gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
524       for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
525         GstMssDemuxStream *stream = iter->data;
526
527         stream->eos = FALSE;
528         gst_data_queue_flush (stream->dataqueue);
529         stream->pending_newsegment = gst_event_ref (newsegment);
530       }
531       gst_event_unref (newsegment);
532
533       if (flags & GST_SEEK_FLAG_FLUSH) {
534         GstEvent *flush = gst_event_new_flush_stop ();
535         GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
536
537         gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
538         gst_mss_demux_push_src_event (mssdemux, flush);
539         gst_event_unref (flush);
540       }
541
542       gst_mss_demux_restart_tasks (mssdemux);
543
544       return TRUE;
545     }
546     default:
547       break;
548   }
549
550   return gst_pad_event_default (pad, event);
551 }
552
553 static gboolean
554 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
555 {
556   GstMssDemux *mssdemux;
557   gboolean ret = FALSE;
558
559   if (query == NULL)
560     return FALSE;
561
562   mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
563
564   switch (query->type) {
565     case GST_QUERY_DURATION:{
566       GstClockTime duration = -1;
567       GstFormat fmt;
568
569       gst_query_parse_duration (query, &fmt, NULL);
570       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
571         /* TODO should we use the streams accumulated duration or the main manifest duration? */
572         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
573
574         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
575           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
576           ret = TRUE;
577         }
578       }
579       GST_INFO_OBJECT (mssdemux, "GST_QUERY_DURATION returns %s with duration %"
580           GST_TIME_FORMAT, ret ? "TRUE" : "FALSE", GST_TIME_ARGS (duration));
581       break;
582     }
583     case GST_QUERY_LATENCY:{
584       gboolean live = FALSE;
585
586       live = mssdemux->manifest
587           && gst_mss_manifest_is_live (mssdemux->manifest);
588
589       gst_query_set_latency (query, live, 0, -1);
590       ret = TRUE;
591       break;
592     }
593     case GST_QUERY_SEEKING:{
594       GstFormat fmt;
595       gint64 stop = -1;
596
597       if (mssdemux->manifest && gst_mss_manifest_is_live (mssdemux->manifest)) {
598         return FALSE;           /* no live seeking */
599       }
600
601       gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
602       GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
603           fmt);
604       if (fmt == GST_FORMAT_TIME) {
605         GstClockTime duration;
606         duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
607         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
608           stop = duration;
609         gst_query_set_seeking (query, fmt, TRUE, 0, stop);
610         ret = TRUE;
611         GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
612             GST_TIME_FORMAT, GST_TIME_ARGS (stop));
613       }
614       break;
615     }
616     default:
617       /* Don't fordward queries upstream because of the special nature of this
618        *  "demuxer", which relies on the upstream element only to be fed
619        *  the Manifest
620        */
621       break;
622   }
623
624   return ret;
625 }
626
627 static void
628 _set_src_pad_functions (GstPad * pad)
629 {
630   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
631   gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
632 }
633
634 static GstPad *
635 _create_pad (GstMssDemux * mssdemux, GstMssStream * manifeststream)
636 {
637   gchar *name;
638   GstPad *srcpad = NULL;
639   GstMssStreamType streamtype;
640
641   streamtype = gst_mss_stream_get_type (manifeststream);
642   GST_DEBUG_OBJECT (mssdemux, "Found stream of type: %s",
643       gst_mss_stream_type_name (streamtype));
644
645   /* TODO use stream's name/bitrate/index as the pad name? */
646   if (streamtype == MSS_STREAM_TYPE_VIDEO) {
647     name = g_strdup_printf ("video_%02u", mssdemux->n_videos++);
648     srcpad =
649         gst_pad_new_from_static_template (&gst_mss_demux_videosrc_template,
650         name);
651     g_free (name);
652   } else if (streamtype == MSS_STREAM_TYPE_AUDIO) {
653     name = g_strdup_printf ("audio_%02u", mssdemux->n_audios++);
654     srcpad =
655         gst_pad_new_from_static_template (&gst_mss_demux_audiosrc_template,
656         name);
657     g_free (name);
658   }
659
660   if (!srcpad) {
661     GST_WARNING_OBJECT (mssdemux, "Ignoring unknown type stream");
662     return NULL;
663   }
664
665   _set_src_pad_functions (srcpad);
666   return srcpad;
667 }
668
669 static void
670 gst_mss_demux_create_streams (GstMssDemux * mssdemux)
671 {
672   GSList *streams = gst_mss_manifest_get_streams (mssdemux->manifest);
673   GSList *iter;
674
675   if (streams == NULL) {
676     GST_INFO_OBJECT (mssdemux, "No streams found in the manifest");
677     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
678         (_("This file contains no playable streams.")),
679         ("no streams found at the Manifest"));
680     return;
681   }
682
683   for (iter = streams; iter; iter = g_slist_next (iter)) {
684     GstPad *srcpad = NULL;
685     GstMssDemuxStream *stream = NULL;
686     GstMssStream *manifeststream = iter->data;
687
688     srcpad = _create_pad (mssdemux, manifeststream);
689
690     if (!srcpad) {
691       continue;
692     }
693
694     stream = gst_mss_demux_stream_new (mssdemux, manifeststream, srcpad);
695     gst_mss_stream_set_active (manifeststream, TRUE);
696     mssdemux->streams = g_slist_append (mssdemux->streams, stream);
697   }
698
699   /* select initial bitrates */
700   GST_OBJECT_LOCK (mssdemux);
701   GST_INFO_OBJECT (mssdemux, "Changing max bitrate to %llu",
702       mssdemux->connection_speed);
703   gst_mss_manifest_change_bitrate (mssdemux->manifest,
704       mssdemux->connection_speed);
705   mssdemux->update_bitrates = FALSE;
706   GST_OBJECT_UNLOCK (mssdemux);
707 }
708
709 static gboolean
710 gst_mss_demux_expose_stream (GstMssDemux * mssdemux, GstMssDemuxStream * stream)
711 {
712   GstCaps *caps;
713   GstCaps *media_caps;
714   GstPad *pad = stream->pad;
715
716   media_caps = gst_mss_stream_get_caps (stream->manifest_stream);
717
718   if (media_caps) {
719     caps = gst_caps_new_simple ("video/quicktime", "variant", G_TYPE_STRING,
720         "mss-fragmented", "timescale", G_TYPE_UINT64,
721         gst_mss_stream_get_timescale (stream->manifest_stream), "media-caps",
722         GST_TYPE_CAPS, media_caps, NULL);
723     gst_caps_unref (media_caps);
724     gst_pad_set_caps (pad, caps);
725     gst_caps_unref (caps);
726
727     gst_pad_set_active (pad, TRUE);
728     GST_INFO_OBJECT (mssdemux, "Adding srcpad %s:%s with caps %" GST_PTR_FORMAT,
729         GST_DEBUG_PAD_NAME (pad), caps);
730     gst_object_ref (pad);
731     gst_element_add_pad (GST_ELEMENT_CAST (mssdemux), pad);
732   } else {
733     GST_WARNING_OBJECT (mssdemux,
734         "Couldn't get caps from manifest stream %p %s, not exposing it", stream,
735         GST_PAD_NAME (stream->pad));
736     return FALSE;
737   }
738   return TRUE;
739 }
740
741 static gboolean
742 gst_mss_demux_process_manifest (GstMssDemux * mssdemux)
743 {
744   GstQuery *query;
745   gchar *uri = NULL;
746   gboolean ret;
747   GSList *iter;
748
749   g_return_val_if_fail (mssdemux->manifest_buffer != NULL, FALSE);
750   g_return_val_if_fail (mssdemux->manifest == NULL, FALSE);
751
752   query = gst_query_new_uri ();
753   ret = gst_pad_peer_query (mssdemux->sinkpad, query);
754   if (ret) {
755     gchar *baseurl_end;
756     gst_query_parse_uri (query, &uri);
757     GST_INFO_OBJECT (mssdemux, "Upstream is using URI: %s", uri);
758
759     mssdemux->manifest_uri = g_strdup (uri);
760     baseurl_end = g_strrstr (uri, "/Manifest");
761     if (baseurl_end) {
762       /* set the new end of the string */
763       baseurl_end[0] = '\0';
764     } else {
765       GST_WARNING_OBJECT (mssdemux, "Stream's URI didn't end with /manifest");
766     }
767
768     mssdemux->base_url = uri;
769   }
770   gst_query_unref (query);
771
772   if (mssdemux->base_url == NULL) {
773     GST_ELEMENT_ERROR (mssdemux, RESOURCE, NOT_FOUND,
774         (_("Couldn't get the Manifest's URI")),
775         ("need to get the manifest's URI from upstream elements"));
776     return FALSE;
777   }
778
779   mssdemux->manifest = gst_mss_manifest_new (mssdemux->manifest_buffer);
780   if (!mssdemux->manifest) {
781     GST_ELEMENT_ERROR (mssdemux, STREAM, FORMAT, ("Bad manifest file"),
782         ("Xml manifest file couldn't be parsed"));
783     return FALSE;
784   }
785
786   GST_INFO_OBJECT (mssdemux, "Live stream: %d",
787       gst_mss_manifest_is_live (mssdemux->manifest));
788
789   gst_mss_demux_create_streams (mssdemux);
790   for (iter = mssdemux->streams; iter;) {
791     GSList *current = iter;
792     GstMssDemuxStream *stream = iter->data;
793     iter = g_slist_next (iter); /* do it ourselves as we want it done in the beginning of the loop */
794     if (!gst_mss_demux_expose_stream (mssdemux, stream)) {
795       gst_mss_demux_stream_free (stream);
796       mssdemux->streams = g_slist_delete_link (mssdemux->streams, current);
797     }
798   }
799
800   if (!mssdemux->streams) {
801     /* no streams */
802     GST_WARNING_OBJECT (mssdemux, "Couldn't identify the caps for any of the "
803         "streams found in the manifest");
804     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
805         (_("This file contains no playable streams.")),
806         ("No known stream formats found at the Manifest"));
807     return FALSE;
808   }
809
810   gst_element_no_more_pads (GST_ELEMENT_CAST (mssdemux));
811   return TRUE;
812 }
813
814 static void
815 gst_mss_demux_reload_manifest (GstMssDemux * mssdemux)
816 {
817   GstUriDownloader *downloader;
818   GstFragment *manifest_data;
819   GstBuffer *manifest_buffer;
820
821   downloader = gst_uri_downloader_new ();
822
823   manifest_data =
824       gst_uri_downloader_fetch_uri (downloader, mssdemux->manifest_uri);
825   manifest_buffer = gst_fragment_get_buffer (manifest_data);
826   g_object_unref (manifest_data);
827
828   gst_mss_manifest_reload_fragments (mssdemux->manifest, manifest_buffer);
829   gst_buffer_replace (&mssdemux->manifest_buffer, manifest_buffer);
830   gst_buffer_unref (manifest_buffer);
831
832   g_object_unref (downloader);
833 }
834
835 static void
836 gst_mss_demux_reconfigure (GstMssDemux * mssdemux)
837 {
838   GSList *oldpads = NULL;
839   GSList *iter;
840
841   gst_mss_demux_stop_tasks (mssdemux, TRUE);
842   if (gst_mss_manifest_change_bitrate (mssdemux->manifest,
843           mssdemux->connection_speed)) {
844     GstClockTime newseg_ts = GST_CLOCK_TIME_NONE;
845
846     GST_DEBUG_OBJECT (mssdemux, "Creating new pad group");
847     /* if we changed the bitrate, we need to add new pads */
848     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
849       GstMssDemuxStream *stream = iter->data;
850       GstPad *oldpad = stream->pad;
851       GstClockTime ts = GST_CLOCK_TIME_NONE;
852
853       oldpads = g_slist_prepend (oldpads, oldpad);
854
855       /* since we are flushing the queue, get the next un-pushed timestamp to seek
856        * and avoid gaps */
857       gst_data_queue_set_flushing (stream->dataqueue, FALSE);
858       if (!gst_data_queue_is_empty (stream->dataqueue)) {
859         GstDataQueueItem *item = NULL;
860
861         while (!gst_data_queue_is_empty (stream->dataqueue)
862             && !GST_CLOCK_TIME_IS_VALID (ts)) {
863           gst_data_queue_pop (stream->dataqueue, &item);
864
865           if (!item) {
866             g_assert_not_reached ();
867             break;
868           }
869
870           if (GST_IS_BUFFER (item->object)) {
871             GstBuffer *buffer = GST_BUFFER_CAST (item->object);
872
873             ts = GST_BUFFER_TIMESTAMP (buffer);
874           }
875           item->destroy (item);
876         }
877
878       }
879       if (!GST_CLOCK_TIME_IS_VALID (ts)) {
880         ts = gst_mss_stream_get_fragment_gst_timestamp
881             (stream->manifest_stream);
882       }
883
884       if (ts < newseg_ts)
885         newseg_ts = ts;
886
887       GST_DEBUG_OBJECT (mssdemux,
888           "Seeking stream %p %s to ts %" GST_TIME_FORMAT, stream,
889           GST_PAD_NAME (stream->pad), GST_TIME_ARGS (ts));
890       gst_mss_stream_seek (stream->manifest_stream, ts);
891       gst_data_queue_flush (stream->dataqueue);
892
893       stream->pad = _create_pad (mssdemux, stream->manifest_stream);
894       gst_mss_demux_expose_stream (mssdemux, stream);
895
896       gst_pad_push_event (oldpad, gst_event_new_eos ());
897     }
898
899     gst_element_no_more_pads (GST_ELEMENT (mssdemux));
900
901     for (iter = oldpads; iter; iter = g_slist_next (iter)) {
902       GstPad *oldpad = iter->data;
903
904       gst_pad_set_active (oldpad, FALSE);
905       gst_element_remove_pad (GST_ELEMENT (mssdemux), oldpad);
906       gst_object_unref (oldpad);
907     }
908     for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
909       GstMssDemuxStream *stream = iter->data;
910
911       stream->pending_newsegment =
912           gst_event_new_new_segment (TRUE, 1.0, GST_FORMAT_TIME, newseg_ts, -1,
913           newseg_ts);
914     }
915   }
916   gst_mss_demux_restart_tasks (mssdemux);
917 }
918
919 static void
920 _free_data_queue_item (gpointer obj)
921 {
922   GstDataQueueItem *item = obj;
923
924   gst_mini_object_unref (item->object);
925   g_slice_free (GstDataQueueItem, item);
926 }
927
928 static void
929 gst_mss_demux_stream_store_object (GstMssDemuxStream * stream,
930     GstMiniObject * obj)
931 {
932   GstDataQueueItem *item;
933
934   item = g_slice_new (GstDataQueueItem);
935   item->object = (GstMiniObject *) obj;
936
937   item->duration = 0;           /* we don't care */
938   item->size = 0;
939   item->visible = TRUE;
940
941   item->destroy = (GDestroyNotify) _free_data_queue_item;
942
943   if (!gst_data_queue_push (stream->dataqueue, item)) {
944     GST_DEBUG_OBJECT (stream->parent, "Failed to store object %p", obj);
945     gst_mini_object_unref (obj);
946     g_slice_free (GstDataQueueItem, item);
947   }
948 }
949
950 static GstFlowReturn
951 gst_mss_demux_stream_download_fragment (GstMssDemuxStream * stream,
952     GstBuffer ** buffer)
953 {
954   GstMssDemux *mssdemux = stream->parent;
955   gchar *path;
956   gchar *url;
957   GstFragment *fragment;
958   GstBuffer *_buffer;
959   GstFlowReturn ret = GST_FLOW_OK;
960
961   GST_DEBUG_OBJECT (mssdemux, "Getting url for stream %p", stream);
962   ret = gst_mss_stream_get_fragment_url (stream->manifest_stream, &path);
963   switch (ret) {
964     case GST_FLOW_OK:
965       break;                    /* all is good, let's go */
966     case GST_FLOW_UNEXPECTED:  /* EOS */
967       gst_mss_demux_reload_manifest (mssdemux);
968       return GST_FLOW_OK;
969       return GST_FLOW_UNEXPECTED;
970     case GST_FLOW_ERROR:
971       goto error;
972     default:
973       break;
974   }
975   if (!path) {
976     goto no_url_error;
977   }
978   GST_DEBUG_OBJECT (mssdemux, "Got url path '%s' for stream %p", path, stream);
979
980   url = g_strdup_printf ("%s/%s", mssdemux->base_url, path);
981
982   GST_DEBUG_OBJECT (mssdemux, "Got url '%s' for stream %p", url, stream);
983
984   fragment = gst_uri_downloader_fetch_uri (stream->downloader, url);
985   g_free (path);
986   g_free (url);
987
988   if (!fragment) {
989     GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
990     /* TODO check if we are truly stoping */
991     if (gst_mss_manifest_is_live (mssdemux->manifest)) {
992       /* looks like there is no way of knowing when a live stream has ended
993        * Have to assume we are falling behind and cause a manifest reload */
994       return GST_FLOW_OK;
995     }
996     return GST_FLOW_ERROR;
997   }
998
999   _buffer = gst_fragment_get_buffer (fragment);
1000   _buffer = gst_buffer_make_metadata_writable (_buffer);
1001   gst_buffer_set_caps (_buffer, GST_PAD_CAPS (stream->pad));
1002   GST_BUFFER_TIMESTAMP (_buffer) =
1003       gst_mss_stream_get_fragment_gst_timestamp (stream->manifest_stream);
1004   GST_BUFFER_DURATION (_buffer) =
1005       gst_mss_stream_get_fragment_gst_duration (stream->manifest_stream);
1006
1007   g_object_unref (fragment);
1008
1009   if (buffer)
1010     *buffer = _buffer;
1011
1012   if (_buffer) {
1013     GST_DEBUG_OBJECT (mssdemux,
1014         "Storing buffer for stream %p - %s. Timestamp: %" GST_TIME_FORMAT
1015         " Duration: %" GST_TIME_FORMAT,
1016         stream, GST_PAD_NAME (stream->pad),
1017         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (_buffer)),
1018         GST_TIME_ARGS (GST_BUFFER_DURATION (_buffer)));
1019     gst_mss_demux_stream_store_object (stream, GST_MINI_OBJECT_CAST (_buffer));
1020   }
1021
1022   return ret;
1023
1024 no_url_error:
1025   {
1026     GST_ELEMENT_ERROR (mssdemux, STREAM, DEMUX,
1027         (_("Failed to get fragment URL.")),
1028         ("An error happened when getting fragment URL"));
1029     gst_task_stop (stream->download_task);
1030     return GST_FLOW_ERROR;
1031   }
1032 error:
1033   {
1034     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1035     gst_task_stop (stream->download_task);
1036     return GST_FLOW_ERROR;
1037   }
1038 }
1039
1040 static void
1041 gst_mss_demux_download_loop (GstMssDemuxStream * stream)
1042 {
1043   GstMssDemux *mssdemux = stream->parent;
1044   GstBuffer *buffer = NULL;
1045   GstFlowReturn ret;
1046
1047   GST_LOG_OBJECT (mssdemux, "download loop start %p", stream);
1048
1049
1050   ret = gst_mss_demux_stream_download_fragment (stream, &buffer);
1051   switch (ret) {
1052     case GST_FLOW_OK:
1053       break;                    /* all is good, let's go */
1054     case GST_FLOW_UNEXPECTED:  /* EOS */
1055       goto eos;
1056     case GST_FLOW_ERROR:
1057       goto error;
1058     default:
1059       break;
1060   }
1061
1062   if (buffer) {
1063     gst_mss_stream_advance_fragment (stream->manifest_stream);
1064   }
1065   GST_LOG_OBJECT (mssdemux, "download loop end %p", stream);
1066   return;
1067
1068 eos:
1069   {
1070     GST_DEBUG_OBJECT (mssdemux, "Storing EOS for pad %s:%s",
1071         GST_DEBUG_PAD_NAME (stream->pad));
1072     gst_mss_demux_stream_store_object (stream,
1073         GST_MINI_OBJECT_CAST (gst_event_new_eos ()));
1074     gst_task_stop (stream->download_task);
1075     return;
1076   }
1077 error:
1078   {
1079     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1080     gst_task_stop (stream->download_task);
1081     return;
1082   }
1083 }
1084
1085 static GstFlowReturn
1086 gst_mss_demux_select_latest_stream (GstMssDemux * mssdemux,
1087     GstMssDemuxStream ** stream)
1088 {
1089   GstFlowReturn ret = GST_FLOW_OK;
1090   GstMssDemuxStream *current = NULL;
1091   GstClockTime cur_time = GST_CLOCK_TIME_NONE;
1092   GSList *iter;
1093
1094   if (!mssdemux->streams)
1095     return GST_FLOW_ERROR;
1096
1097   for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
1098     GstClockTime time;
1099     GstMssDemuxStream *other;
1100     GstDataQueueItem *item;
1101
1102     other = iter->data;
1103     if (other->eos) {
1104       continue;
1105     }
1106
1107     if (gst_data_queue_peek (other->dataqueue, &item)) {
1108     } else {
1109       /* flushing */
1110       return GST_FLOW_WRONG_STATE;
1111     }
1112
1113     if (GST_IS_EVENT (item->object)) {
1114       /* events have higher priority */
1115       current = other;
1116       break;
1117     }
1118     time = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (item->object));
1119     if (time < cur_time) {
1120       cur_time = time;
1121       current = other;
1122     }
1123   }
1124
1125   *stream = current;
1126   if (current == NULL)
1127     ret = GST_FLOW_UNEXPECTED;
1128   return ret;
1129 }
1130
1131 static void
1132 gst_mss_demux_stream_loop (GstMssDemux * mssdemux)
1133 {
1134   GstMssDemuxStream *stream = NULL;
1135   GstFlowReturn ret;
1136   GstMiniObject *object = NULL;
1137   GstDataQueueItem *item = NULL;
1138
1139   GST_LOG_OBJECT (mssdemux, "Starting stream loop");
1140
1141   GST_OBJECT_LOCK (mssdemux);
1142   if (mssdemux->update_bitrates) {
1143     mssdemux->update_bitrates = FALSE;
1144     GST_OBJECT_UNLOCK (mssdemux);
1145
1146     GST_DEBUG_OBJECT (mssdemux,
1147         "Starting streams reconfiguration due to bitrate changes");
1148     gst_mss_demux_reconfigure (mssdemux);
1149     GST_DEBUG_OBJECT (mssdemux, "Finished streams reconfiguration");
1150   } else {
1151     GST_OBJECT_UNLOCK (mssdemux);
1152   }
1153
1154   ret = gst_mss_demux_select_latest_stream (mssdemux, &stream);
1155
1156   if (stream)
1157     GST_DEBUG_OBJECT (mssdemux,
1158         "Stream loop selected %p stream of pad %s. %d - %s", stream,
1159         GST_PAD_NAME (stream->pad), ret, gst_flow_get_name (ret));
1160   else
1161     GST_DEBUG_OBJECT (mssdemux, "No streams selected -> %d - %s", ret,
1162         gst_flow_get_name (ret));
1163
1164   switch (ret) {
1165     case GST_FLOW_OK:
1166       break;
1167     case GST_FLOW_ERROR:
1168       goto error;
1169     case GST_FLOW_UNEXPECTED:
1170       goto eos;
1171     case GST_FLOW_WRONG_STATE:
1172       GST_DEBUG_OBJECT (mssdemux, "Wrong state, stopping task");
1173       goto stop;
1174     default:
1175       g_assert_not_reached ();
1176   }
1177
1178   GST_LOG_OBJECT (mssdemux, "popping next item from queue for stream %p %s",
1179       stream, GST_PAD_NAME (stream->pad));
1180   if (gst_data_queue_pop (stream->dataqueue, &item)) {
1181     if (item->object)
1182       object = gst_mini_object_ref (item->object);
1183     item->destroy (item);
1184   } else {
1185     GST_DEBUG_OBJECT (mssdemux,
1186         "Failed to get object from dataqueue on stream %p %s", stream,
1187         GST_PAD_NAME (stream->pad));
1188     goto stop;
1189   }
1190
1191   if (G_UNLIKELY (stream->pending_newsegment)) {
1192     gst_pad_push_event (stream->pad, stream->pending_newsegment);
1193     stream->pending_newsegment = NULL;
1194   }
1195
1196   if (G_LIKELY (GST_IS_BUFFER (object))) {
1197     if (GST_BUFFER_TIMESTAMP (object) != stream->next_timestamp) {
1198       GST_ERROR_OBJECT (mssdemux, "Marking buffer %p as discont buffer:%"
1199           GST_TIME_FORMAT " != expected:%" GST_TIME_FORMAT, object,
1200           GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1201           GST_TIME_ARGS (stream->next_timestamp));
1202       GST_BUFFER_FLAG_SET (object, GST_BUFFER_FLAG_DISCONT);
1203     }
1204
1205     GST_DEBUG_OBJECT (mssdemux,
1206         "Pushing buffer %p %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
1207         " discont:%d on pad %s", object,
1208         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (object)),
1209         GST_TIME_ARGS (GST_BUFFER_DURATION (object)),
1210         GST_BUFFER_FLAG_IS_SET (object, GST_BUFFER_FLAG_DISCONT),
1211         GST_PAD_NAME (stream->pad));
1212
1213     stream->next_timestamp =
1214         GST_BUFFER_TIMESTAMP (object) + GST_BUFFER_DURATION (object);
1215
1216     ret = gst_pad_push (stream->pad, GST_BUFFER_CAST (object));
1217   } else if (GST_IS_EVENT (object)) {
1218     if (GST_EVENT_TYPE (object) == GST_EVENT_EOS)
1219       stream->eos = TRUE;
1220     GST_DEBUG_OBJECT (mssdemux, "Pushing event %p on pad %s", object,
1221         GST_PAD_NAME (stream->pad));
1222     gst_pad_push_event (stream->pad, GST_EVENT_CAST (object));
1223   } else {
1224     g_return_if_reached ();
1225   }
1226
1227   switch (ret) {
1228     case GST_FLOW_UNEXPECTED:
1229       goto eos;                 /* EOS ? */
1230     case GST_FLOW_ERROR:
1231       goto error;
1232     case GST_FLOW_NOT_LINKED:
1233       break;                    /* TODO what to do here? pause the task or just keep pushing? */
1234     case GST_FLOW_OK:
1235     default:
1236       break;
1237   }
1238
1239   GST_LOG_OBJECT (mssdemux, "Stream loop end");
1240   return;
1241
1242 eos:
1243   {
1244     GST_DEBUG_OBJECT (mssdemux, "EOS on all pads");
1245     gst_task_stop (mssdemux->stream_task);
1246     return;
1247   }
1248 error:
1249   {
1250     GST_WARNING_OBJECT (mssdemux, "Error while pushing fragment");
1251     gst_task_stop (mssdemux->stream_task);
1252     return;
1253   }
1254 stop:
1255   {
1256     GST_DEBUG_OBJECT (mssdemux, "Stopping streaming task");
1257     gst_task_stop (mssdemux->stream_task);
1258     return;
1259   }
1260 }