tizen 2.0 init
[framework/multimedia/gst-plugins-ext0.10.git] / ssdemux / src / gstssdemux.c
1
2 #ifdef HAVE_CONFIG_H
3 #  include "config.h"
4 #endif
5
6 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
7  * with newer GLib versions (>= 2.31.0) */
8 #define GLIB_DISABLE_DEPRECATION_WARNINGS
9
10 #include <string.h>
11 //#include <gst/glib-compat-private.h>
12 #include "gstssdemux.h"
13
14 enum
15 {
16   PROP_0,
17   PROP_COOKIES,
18   PROP_ALLOW_AUDIO_ONLY,
19   PROP_FRAGMENTS_CACHE,
20   PROP_BITRATE_SWITCH_TOLERANCE,
21   PROP_LAST
22 };
23
24 static GstStaticPadTemplate ssdemux_videosrc_template =
25 GST_STATIC_PAD_TEMPLATE ("video",
26     GST_PAD_SRC,
27     GST_PAD_SOMETIMES,
28     GST_STATIC_CAPS_ANY);
29
30 static GstStaticPadTemplate ssdemux_audiosrc_template =
31 GST_STATIC_PAD_TEMPLATE ("audio",
32     GST_PAD_SRC,
33     GST_PAD_SOMETIMES,
34     GST_STATIC_CAPS_ANY);
35
36 static GstStaticPadTemplate ssdemux_subsrc_template =
37 GST_STATIC_PAD_TEMPLATE ("subtitle",
38     GST_PAD_SRC,
39     GST_PAD_SOMETIMES,
40     GST_STATIC_CAPS_ANY);
41
42 static GstStaticPadTemplate ssdemux_sink_template =
43 GST_STATIC_PAD_TEMPLATE ("sink",
44     GST_PAD_SINK,
45     GST_PAD_ALWAYS,
46     GST_STATIC_CAPS ("application/x-ss")); // Need to decide source mimetype
47
48 GST_DEBUG_CATEGORY_STATIC (gst_ss_demux_debug);
49 #define GST_CAT_DEFAULT gst_ss_demux_debug
50
51 #undef SIMULATE_AUDIO_ONLY /* enable to simulate audio only case forcibly */
52
53 static void
54 _do_init (GType type)
55 {
56   GST_DEBUG_CATEGORY_INIT (gst_ss_demux_debug, "ssdemux", 0,
57       "ssdemux element");
58 }
59
60 GST_BOILERPLATE_FULL (GstSSDemux, gst_ss_demux, GstElement,
61     GST_TYPE_ELEMENT, _do_init);
62
63 #define DEFAULT_FRAGMENTS_CACHE 0
64 #define DEFAULT_BITRATE_SWITCH_TOLERANCE 0.4
65
66 struct _GstSSDemuxStream
67 {
68   /* Streaming task */
69   void *parent;
70   GstPad *pad;
71   gchar *name;
72   SS_STREAM_TYPE type;
73   GstTask *stream_task;
74   GStaticRecMutex stream_lock;
75   GstElement *pipe;
76   GstElement *urisrc;
77   GstElement *parser;
78   GstElement *sink;
79   GstBus *bus;
80   GMutex *lock;
81   GCond *cond;
82   guint frag_cnt;
83   GQueue *queue;
84   gchar *uri;
85   guint64 start_ts;
86   gboolean sent_ns;
87   GstCaps *caps;
88   guint64 switch_ts;
89   guint64 avg_dur;
90 };
91
92 static void gst_ss_demux_set_property (GObject * object, guint prop_id,
93     const GValue * value, GParamSpec * pspec);
94 static void gst_ss_demux_get_property (GObject * object, guint prop_id,
95         GValue * value, GParamSpec * pspec);
96 static gboolean gst_ss_demux_sink_event (GstPad * pad, GstEvent * event);
97 static GstStateChangeReturn gst_ss_demux_change_state (GstElement * element, GstStateChange transition);
98 static void gst_ss_demux_dispose (GObject * obj);
99 static GstFlowReturn gst_ss_demux_chain (GstPad * pad, GstBuffer * buf);
100 static void gst_ss_demux_stream_loop (GstSSDemux * demux);
101 static gboolean gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data);
102 static void gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM_TYPE stream_type);
103 static void gst_ss_demux_stream_free (GstSSDemux * demux, GstSSDemuxStream * stream);
104 static void gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data);
105 static gboolean gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts);
106 static gboolean gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts);
107 static void gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream);
108 static gboolean gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream);
109 static gboolean gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream);
110
111 static void
112 gst_ss_demux_base_init (gpointer g_class)
113 {
114   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
115
116   gst_element_class_add_pad_template (element_class,
117       gst_static_pad_template_get (&ssdemux_videosrc_template));
118   gst_element_class_add_pad_template (element_class,
119       gst_static_pad_template_get (&ssdemux_audiosrc_template));
120   gst_element_class_add_pad_template (element_class,
121       gst_static_pad_template_get (&ssdemux_subsrc_template));
122   gst_element_class_add_pad_template (element_class,
123       gst_static_pad_template_get (&ssdemux_sink_template));
124
125   gst_element_class_set_details_simple (element_class,
126       "SS Demuxer",
127       "Demuxer/URIList",
128       "Smooth Streaming demuxer",
129       "Naveen Cherukuri<naveen.ch@samsung.com>");
130 }
131
132 static void
133 gst_ss_demux_class_init (GstSSDemuxClass * klass)
134 {
135   GObjectClass *gobject_class;
136   GstElementClass *gstelement_class;
137
138   gobject_class = (GObjectClass *) klass;
139   gstelement_class = (GstElementClass *) klass;
140
141   gobject_class->set_property = gst_ss_demux_set_property;
142   gobject_class->get_property = gst_ss_demux_get_property;
143   gobject_class->dispose = gst_ss_demux_dispose;
144
145   /* to share cookies with other sessions */
146   g_object_class_install_property (gobject_class, PROP_COOKIES,
147       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
148           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
149
150   /* will be considered only in LIVE case */
151   g_object_class_install_property (gobject_class, PROP_ALLOW_AUDIO_ONLY,
152       g_param_spec_boolean ("allow-audio-only", "Allow audio only when downloadrate is less in live case",
153           "Allow audio only stream download in live case when download rate is less",
154           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
155
156   /* no.of fragments to cache */
157   g_object_class_install_property (gobject_class, PROP_FRAGMENTS_CACHE,
158       g_param_spec_uint ("fragments-cache", "Fragments cache",
159           "Number of fragments needed to be cached to start playing",
160           0, G_MAXUINT, DEFAULT_FRAGMENTS_CACHE,
161           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162
163   g_object_class_install_property (gobject_class, PROP_BITRATE_SWITCH_TOLERANCE,
164       g_param_spec_float ("bitrate-switch-tolerance",
165           "Bitrate switch tolerance",
166           "Tolerance with respect of the fragment duration to switch to "
167           "a different bitrate if the client is too slow/fast.",
168           0, 1, DEFAULT_BITRATE_SWITCH_TOLERANCE,
169           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
170
171   gstelement_class->change_state =
172       GST_DEBUG_FUNCPTR (gst_ss_demux_change_state);
173 }
174
175 static void
176 gst_ss_demux_init (GstSSDemux * demux, GstSSDemuxClass * klass)
177 {
178   /* sink pad */
179   demux->sinkpad = gst_pad_new_from_static_template (&ssdemux_sink_template, "sink");
180   gst_pad_set_chain_function (demux->sinkpad,
181       GST_DEBUG_FUNCPTR (gst_ss_demux_chain));
182   gst_pad_set_event_function (demux->sinkpad,
183       GST_DEBUG_FUNCPTR (gst_ss_demux_sink_event));
184   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
185
186   demux->need_cache = TRUE;
187   demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE;
188   demux->cancelled = FALSE;
189   demux->cookies = NULL;
190   demux->ss_mode = SS_MODE_NO_SWITCH;
191   demux->switch_eos = FALSE;
192 }
193
194 static void
195 gst_ss_demux_dispose (GObject * obj)
196 {
197   GstSSDemux *demux = GST_SS_DEMUX (obj);
198   int n =0;
199
200   for (n = 0; n < SS_STREAM_NUM; n++) {
201     if (demux->streams[n]) {
202       gst_ss_demux_stream_free (demux, demux->streams[n]);
203       demux->streams[n] = NULL;
204     }
205   }
206
207   if (demux->parser) {
208     gst_ssm_parse_free (demux->parser);
209     demux->parser = NULL;
210   }
211
212   G_OBJECT_CLASS (parent_class)->dispose (obj);
213 }
214
215 static void
216 gst_ss_demux_set_property (GObject * object, guint prop_id,
217     const GValue * value, GParamSpec * pspec)
218 {
219   GstSSDemux *demux = GST_SS_DEMUX (object);
220
221   switch (prop_id) {
222      case PROP_COOKIES:
223       g_strfreev (demux->cookies);
224       demux->cookies = g_strdupv (g_value_get_boxed (value));
225       break;
226     case PROP_ALLOW_AUDIO_ONLY:
227       demux->allow_audio_only = g_value_get_boolean (value);
228       break;
229     case PROP_FRAGMENTS_CACHE:
230       demux->fragments_cache = g_value_get_uint (value);
231       break;
232     case PROP_BITRATE_SWITCH_TOLERANCE:
233       demux->bitrate_switch_tol = g_value_get_float (value);
234       break;
235     default:
236       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
237       break;
238   }
239 }
240
241 static void
242 gst_ss_demux_get_property (GObject * object, guint prop_id, GValue * value,
243     GParamSpec * pspec)
244 {
245   GstSSDemux *demux = GST_SS_DEMUX (object);
246
247   switch (prop_id) {
248     case PROP_COOKIES:
249       g_value_set_boxed (value, g_strdupv (demux->cookies));
250       break;
251     case PROP_ALLOW_AUDIO_ONLY:
252       g_value_set_boolean (value, demux->allow_audio_only);
253       break;
254     case PROP_FRAGMENTS_CACHE:
255       g_value_set_uint (value, demux->fragments_cache);
256       break;
257     case PROP_BITRATE_SWITCH_TOLERANCE:
258       g_value_set_float (value, demux->bitrate_switch_tol);
259       break;
260     default:
261       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
262       break;
263   }
264 }
265
266 static gboolean
267 gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
268 {
269   GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
270   GstQuery *query;
271   gboolean ret;
272   gchar *uri;
273
274   switch (event->type) {
275     case GST_EVENT_EOS: {
276       int i = 0;
277       if (demux->manifest == NULL) {
278         GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
279         break;
280       }
281
282       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: mainifest file fetched");
283
284       query = gst_query_new_uri ();
285       ret = gst_pad_peer_query (demux->sinkpad, query);
286       if (ret) {
287         gst_query_parse_uri (query, &uri);
288         demux->parser = gst_ssm_parse_new (uri);
289         g_free (uri);
290       } else {
291         GST_ERROR_OBJECT (demux, "failed to query URI from upstream");
292         return FALSE;
293       }
294       gst_query_unref (query);
295
296       GST_LOG_OBJECT (demux, "data = %p & size = %d", GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest));
297       if (!gst_ssm_parse_manifest (demux->parser, (char *)GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest))) {
298         /* In most cases, this will happen if we set a wrong url in the
299          * source element and we have received the 404 HTML response instead of
300          * the playlist */
301         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."),
302             (NULL));
303         return FALSE;
304       }
305
306       for( i = 0; i < SS_STREAM_NUM; i++) {
307         if (gst_ssm_parse_check_stream (demux->parser, i)) {
308           GstSSDemuxStream *stream = g_new0 (GstSSDemuxStream, 1);
309
310           // Add pad emission of the stream
311           gst_ss_demux_stream_init (demux, stream, i);
312           g_static_rec_mutex_init (&stream->stream_lock);
313           stream->stream_task = gst_task_create ((GstTaskFunction) gst_ss_demux_stream_loop, demux);
314           gst_task_set_lock (stream->stream_task, &stream->stream_lock);
315           demux->streams[i] = stream;
316           g_print ("Starting stream - %d task loop...\n", i);
317           gst_task_start (stream->stream_task);
318         }
319       }
320
321       gst_event_unref (event);
322       return TRUE;
323     }
324     case GST_EVENT_NEWSEGMENT:
325       /* Swallow newsegments, we'll push our own */
326       gst_event_unref (event);
327       return TRUE;
328     default:
329       break;
330   }
331
332   return gst_pad_event_default (pad, event);
333 }
334
335 static gboolean
336 gst_ss_demux_handle_src_query (GstPad * pad, GstQuery * query)
337 {
338   gboolean res = FALSE;
339   GstSSDemux *ssdemux = GST_SS_DEMUX (gst_pad_get_parent (pad));
340
341   GST_LOG_OBJECT (pad, "%s query", GST_QUERY_TYPE_NAME (query));
342
343   // TODO: need to add other query types as well
344
345   switch (GST_QUERY_TYPE (query)) {
346     case GST_QUERY_DURATION:{
347       GstFormat fmt;
348
349       gst_query_parse_duration (query, &fmt, NULL);
350       if (fmt == GST_FORMAT_TIME) {
351         gint64 duration = -1;
352
353         duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(ssdemux->parser), GST_SECOND,
354                 GST_SSM_PARSE_GET_TIMESCALE(ssdemux->parser));
355         if (duration > 0) {
356           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
357           res = TRUE;
358         }
359       }
360       break;
361     }
362     default:
363       res = gst_pad_query_default (pad, query);
364       break;
365   }
366
367   gst_object_unref (ssdemux);
368
369   return res;
370 }
371
372
373 static gboolean
374 gst_ss_demux_handle_src_event (GstPad * pad, GstEvent * event)
375 {
376   GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
377
378   switch (event->type) {
379     case GST_EVENT_SEEK:
380     {
381       gdouble rate;
382       GstFormat format;
383       GstSeekFlags flags;
384       GstSeekType start_type, stop_type;
385       gint64 start, stop;
386       gint i = 0;
387       GstSSDemuxStream *stream = NULL;
388
389       GST_INFO_OBJECT (demux, "Received GST_EVENT_SEEK");
390
391       // TODO: should be able to seek in DVR window
392       if (GST_SSM_PARSE_IS_LIVE_PRESENTATION (demux->parser)) {
393         GST_WARNING_OBJECT (demux, "Received seek event for live stream");
394         return FALSE;
395       }
396
397       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
398           &stop_type, &stop);
399
400       if (format != GST_FORMAT_TIME) {
401         GST_WARNING_OBJECT (demux, "Only time format is supported in seek");
402         return FALSE;
403       }
404
405       GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT
406           " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
407           GST_TIME_ARGS (stop));
408
409
410       for( i = 0; i < SS_STREAM_NUM; i++) {
411         if (stream = demux->streams[i]) {
412           g_cond_signal (stream->cond);
413           gst_task_stop (stream->stream_task);
414         }
415       }
416
417       if (flags & GST_SEEK_FLAG_FLUSH) {
418         GST_INFO_OBJECT (demux, "sending flush start");
419
420         for( i = 0; i < SS_STREAM_NUM; i++) {
421           if (stream = demux->streams[i]) {
422             gst_pad_push_event (stream->pad, gst_event_new_flush_start ());
423           }
424         }
425       }
426
427       gst_ssm_parse_seek_manifest (demux->parser, start);
428
429       if (flags & GST_SEEK_FLAG_FLUSH) {
430         GST_INFO_OBJECT (demux, "sending flush stop");
431         for( i = 0; i < SS_STREAM_NUM; i++) {
432           if (stream = demux->streams[i]) {
433             gst_pad_push_event (stream->pad, gst_event_new_flush_stop ());
434             GST_LOG_OBJECT (stream->pad, "Starting pad TASK again...\n");
435             stream->sent_ns = FALSE;
436             stream->frag_cnt = 0; /*resetting to start buffering on SEEK */
437             gst_task_start (stream->stream_task);
438           }
439         }
440       }
441
442       return TRUE;
443     }
444     default:
445       break;
446   }
447
448   return gst_pad_event_default (pad, event);
449 }
450
451 static GstStateChangeReturn
452 gst_ss_demux_change_state (GstElement * element, GstStateChange transition)
453 {
454   GstStateChangeReturn ret;
455
456   switch (transition) {
457     case GST_STATE_CHANGE_READY_TO_PAUSED:
458       break;
459     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
460       break;
461     default:
462       break;
463   }
464
465   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
466
467   switch (transition) {
468     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
469       break;
470     case GST_STATE_CHANGE_PAUSED_TO_READY:
471       break;
472     default:
473       break;
474   }
475   return ret;
476 }
477
478
479 static GstFlowReturn
480 gst_ss_demux_chain (GstPad * pad, GstBuffer * buf)
481 {
482   GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
483
484   if (demux->manifest == NULL)
485     demux->manifest = buf;
486   else
487     demux->manifest = gst_buffer_join (demux->manifest, buf);
488   gst_object_unref (demux);
489
490   return GST_FLOW_OK;
491 }
492
493
494 static gboolean
495 gst_ss_demux_get_next_fragment (GstSSDemux * demux, SS_STREAM_TYPE stream_type)
496 {
497   GstSSDemuxStream *stream = demux->streams[stream_type];
498   gchar *next_fragment_uri = NULL;
499   guint64 start_ts = 0;
500
501   if (!gst_ssm_parse_get_next_fragment_url (demux->parser, stream_type, &next_fragment_uri, &start_ts )) {
502     GST_INFO_OBJECT (demux, "This Manifest does not contain more fragments");
503     goto end_of_list;
504   }
505
506   GST_ERROR_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
507
508   stream->uri = g_strdup(next_fragment_uri);
509   stream->start_ts = start_ts;
510
511   if (!gst_ss_demux_download_fragment (demux, stream, next_fragment_uri, start_ts)) {
512     GST_ERROR_OBJECT (demux, "failed to download fragment...");
513     goto error;
514   }
515
516   return TRUE;
517
518 error:
519   {
520     gst_ss_demux_stop (demux, stream);
521     return FALSE;
522   }
523 end_of_list:
524   {
525     GST_INFO_OBJECT (demux, "Reached end of playlist, sending EOS");
526     gst_pad_push_event (stream->pad, gst_event_new_eos ());
527     gst_ss_demux_stop (demux, stream);
528     return TRUE;
529   }
530 }
531
532 static void
533 gst_ss_demux_stream_loop (GstSSDemux * demux)
534 {
535   GThread *self = NULL;
536   int stream_type = 0;
537   GstSSDemuxStream *stream = NULL;
538
539   self = g_thread_self ();
540
541   for (stream_type = 0; stream_type < SS_STREAM_NUM; stream_type++) {
542     if (demux->streams[stream_type]->stream_task->abidata.ABI.thread == self) {
543       stream = demux->streams[stream_type];
544       break;
545     }
546   }
547
548   /* download next fragment of stream_type */
549   if (!gst_ss_demux_get_next_fragment (demux, stream_type)) {
550     GST_ERROR_OBJECT (demux, "failed to get next fragment...");
551     goto error;
552   }
553
554   return;
555
556 error:
557   {
558     gst_task_pause (stream->stream_task);
559     GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
560           ("could not download fragments"), (NULL));
561       gst_ss_demux_stop (demux, stream);
562     return;
563   }
564 }
565
566 static gboolean
567 gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
568 {
569   GstStateChangeReturn ret;
570
571   g_print ("Going to download fragment : %s\n", uri);
572   if (!gst_ss_demux_create_download_pipe (demux, stream, uri, start_ts)) {
573     GST_ERROR_OBJECT (demux, "failed to create download pipeline");
574     return FALSE;
575   }
576
577   ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
578   if (ret == GST_STATE_CHANGE_FAILURE) {
579     GST_ERROR_OBJECT (demux, "set_state failed...");
580     return FALSE;
581   }
582
583   if (stream->pipe && demux->ss_mode == SS_MODE_AONLY &&
584     stream->type == SS_STREAM_VIDEO) {
585
586     GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI");
587     g_mutex_lock (stream->lock);
588     g_cond_wait (stream->cond, stream->lock);
589     GST_INFO_OBJECT (stream->pad, "Recived signal to shutdown...");
590     g_mutex_unlock (stream->lock);
591
592     /* put live pipeline to PAUSED state to unlink urisrc & piffdemux */
593     gst_element_set_state (stream->pipe, GST_STATE_NULL);
594     gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
595
596     stream->pipe = NULL;
597
598     stream->switch_ts = stream->start_ts;
599
600     /* create dummy frame sender */
601     if (!gst_ss_demux_create_dummy_sender (demux, stream)) {
602       GST_ERROR_OBJECT (demux, "failed to create dummy sender pipeline...");
603       GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("Unable to create dummy pipe."), (NULL));
604       return FALSE;
605     }
606   }
607
608   /* wait until:
609    *   - the download succeed (EOS)
610    *   - the download failed (Error message on the fetcher bus)
611    *   - the download was canceled
612    */
613   GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI");
614   g_mutex_lock (stream->lock);
615   g_cond_wait (stream->cond, stream->lock);
616   GST_INFO_OBJECT (stream->pad, "Recived signal to shutdown...");
617   g_mutex_unlock (stream->lock);
618
619   gst_element_set_state (stream->pipe, GST_STATE_NULL);
620   gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
621   stream->pipe = NULL;
622
623   return TRUE;
624 }
625
626 static gboolean
627 gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream)
628 {
629   GstStateChangeReturn ret;
630
631   if (!gst_ss_demux_create_dummy_pipe (demux, stream)) {
632     GST_ERROR_OBJECT (demux, "failed to create download pipeline");
633     return FALSE;
634   }
635
636   ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
637   if (ret == GST_STATE_CHANGE_FAILURE) {
638     GST_ERROR_OBJECT (demux, "set_state failed...");
639     return FALSE;
640   }
641
642 #if 0
643   GST_DEBUG_OBJECT (demux, "Waiting to download next video URI");
644   g_mutex_lock (stream->lock);
645   g_cond_wait (stream->cond, stream->lock);
646   if (stream->pipe) {
647     gst_element_set_state (stream->pipe, GST_STATE_NULL);
648     gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
649     stream->pipe = NULL;
650   }
651   g_mutex_unlock (stream->lock);
652 #endif
653
654   return TRUE;
655 }
656
657 static void
658 gst_ss_demux_append_live_params(GstElement *piffparser, piff_live_param_t *param, gpointer data)
659 {
660   GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
661   GstSSDemux *demux = stream->parent;
662   int i =0;
663   guint64 timestamp = 0;
664   guint64 duration = 0;
665
666   GST_LOG_OBJECT (demux, "received signal structs count = %d\n", param->count);
667
668   for (i = 0 ; i< param->count; i++) {
669     if (param->long_info) {
670       piff_fragment_longtime_info *info = &(param->long_info[i]);
671       timestamp = info->ts;
672       duration = info->duration;
673     } else if (param->info) {
674       piff_fragment_time_info *info = &(param->info[i]);
675       timestamp = info->ts;
676       duration = info->duration;
677     }
678
679     GST_LOG_OBJECT (demux, "Received ts = %llu and dur = %llu\n", timestamp, duration);
680
681     if (!gst_ssm_parse_append_next_fragment (demux->parser, stream->type, timestamp, duration)) {
682       GST_ERROR_OBJECT (demux, "failed to append new fragment");
683       GST_ELEMENT_ERROR (demux, RESOURCE, NO_SPACE_LEFT, ("fragment allocation failed..."), (NULL));
684       return;
685     }
686   }
687
688   if (param->long_info) {
689     free (param->long_info);
690     param->long_info = NULL;
691   }
692
693   if (param->info) {
694     free (param->info);
695     param->info = NULL;
696   }
697
698   free (param);
699
700   if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode == SS_MODE_AONLY)) {
701     g_print ("\n\n\t\tSignalling download pipe shutdonw....\n\n");
702
703     g_object_get (stream->parser, "frame-dur", &stream->avg_dur, NULL);
704     g_print ("frame duration = %"GST_TIME_FORMAT"\n\n\n", GST_TIME_ARGS(stream->avg_dur));
705     g_cond_signal (stream->cond);
706   }
707
708 }
709
710 static gboolean
711 gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
712 {
713   gchar *name = NULL;
714   GstCaps *caps = NULL;
715
716   if (!gst_uri_is_valid (uri))
717     return FALSE;
718
719   name = g_strdup_printf("%s-%s", stream->name, "downloader");
720
721   stream->pipe = gst_pipeline_new (name);
722   if (!stream->pipe) {
723     GST_ERROR_OBJECT (demux, "failed to create pipeline");
724     return FALSE;
725   }
726   g_free(name);
727
728   name = g_strdup_printf("%s-%s", stream->name, "httpsrc");
729   GST_DEBUG ("Creating source element for the URI:%s", uri);
730   stream->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, name);
731   if (!stream->urisrc) {
732     GST_ERROR_OBJECT (demux, "failed to create urisrc");
733     return FALSE;
734   }
735   g_free(name);
736
737   if (GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser))
738     g_object_set (G_OBJECT (stream->urisrc), "is-live", TRUE, NULL);
739   else
740     g_object_set (G_OBJECT (stream->urisrc), "is-live", FALSE, NULL);
741
742   name = g_strdup_printf("%s-%s", stream->name, "parser");
743   stream->parser = gst_element_factory_make ("piffdemux", name);
744   if (!stream->parser) {
745     GST_ERROR_OBJECT (demux, "failed to create piffdemux element");
746     return FALSE;
747   }
748
749   caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
750   GST_INFO_OBJECT (stream->pad, "prepare caps = %s", gst_caps_to_string(caps));
751
752   g_object_set (G_OBJECT (stream->parser), "caps", caps, NULL);
753   g_object_set (G_OBJECT (stream->parser), "start-ts", start_ts, NULL);
754   g_object_set (G_OBJECT (stream->parser), "duration", GST_SSM_PARSE_GET_DURATION(demux->parser), NULL);
755   g_object_set (G_OBJECT (stream->parser), "is-live", GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser), NULL);
756   g_object_set (G_OBJECT (stream->parser), "lookahead-count", GST_SSM_PARSE_LOOKAHEAD_COUNT(demux->parser), NULL);
757   g_signal_connect (stream->parser, "live-param",  G_CALLBACK (gst_ss_demux_append_live_params), stream);
758
759   g_free(name);
760
761   name = g_strdup_printf("%s-%s", stream->name, "sink");
762   stream->sink = gst_element_factory_make ("appsink", name);
763   if (!stream->sink) {
764     GST_ERROR_OBJECT (demux, "failed to create appsink element");
765     return FALSE;
766   }
767   g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
768   g_signal_connect (stream->sink, "new-buffer",  G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
769
770   g_free(name);
771
772   gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
773   if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
774     GST_ERROR ("failed to link elements...");
775     return FALSE;
776   }
777
778   stream->bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
779   gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
780   gst_object_unref (stream->bus);
781
782   return TRUE;
783 }
784
785 #if 0
786 static gboolean
787 gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream)
788 {
789   gchar *name = NULL;
790   GstCaps *caps = NULL;
791   GstElement *capsfilter = NULL;
792   GstElement *enc = NULL;
793   guint64 avg_dur = -1;
794   guint frame_rate = 0;
795
796   name = g_strdup_printf("%s-%s", stream->name, "dummy");
797
798   stream->pipe = gst_pipeline_new (name);
799   if (!stream->pipe) {
800     GST_ERROR_OBJECT (demux, "failed to create pipeline");
801     return FALSE;
802   }
803   g_free(name);
804
805   /* create dummy sender source */
806   name = g_strdup_printf("%s-%s", stream->name, "dummysrc");
807   stream->urisrc = gst_element_factory_make ("imagereader", name);
808   if (!stream->urisrc) {
809     GST_ERROR_OBJECT (demux,"failed to create filesrc element");
810     return FALSE;
811   }
812   g_free(name);
813   g_object_set (G_OBJECT (stream->urisrc), "location", "/opt/home/root/aonly_VGA_1frame_I420.yuv", NULL);
814   g_object_set (G_OBJECT (stream->urisrc), "framerate", 25, NULL);
815   g_object_set (G_OBJECT (stream->urisrc), "num-buffers", 60, NULL);
816
817   /* caps filter */
818   capsfilter = gst_element_factory_make ("capsfilter", NULL);
819   if (!capsfilter) {
820     GST_ERROR_OBJECT (demux, "failed to create capsfilter element");
821     return FALSE;
822   }
823   caps = gst_caps_new_simple ("video/x-raw-yuv",
824                   "width", G_TYPE_INT, 640,
825                   "height", G_TYPE_INT, 480,
826                   "framerate",GST_TYPE_FRACTION, 25,1,
827                   "format", GST_TYPE_FOURCC, GST_MAKE_FOURCC ('I', '4', '2', '0'),
828                   NULL);
829   g_object_set (G_OBJECT (capsfilter), "caps", caps,  NULL);
830
831   /* create h264parse element */
832   enc = gst_element_factory_make ("savsenc_h264", "H264 encoder");
833   if (!enc) {
834     GST_ERROR_OBJECT (demux, "failed to create h264 parse element");
835     return FALSE;
836   }
837   name = g_strdup_printf("%s-%s", stream->name, "sink");
838   stream->sink = gst_element_factory_make ("appsink", name);
839   if (!stream->sink) {
840     GST_ERROR_OBJECT (demux, "failed to create appsink element");
841     return FALSE;
842   }
843   g_free(name);
844   g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
845   g_signal_connect (stream->sink, "new-buffer",  G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
846
847   /* add to pipeline & link all elements */
848   gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, capsfilter, enc, stream->sink, NULL);
849
850   if (!gst_element_link_many (stream->urisrc, capsfilter, enc, stream->sink, NULL)) {
851     GST_ERROR_OBJECT (demux,"failed to link dummy pipe elements...");
852     return FALSE;
853   }
854
855   stream->bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
856   gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
857   gst_object_unref (stream->bus);
858
859   return TRUE;
860 }
861 #else
862 static gboolean
863 gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream)
864 {
865   gchar *name = NULL;
866   GstBus *bus = NULL;
867   GstCaps *caps = NULL;
868
869   name = g_strdup_printf("%s-%s", stream->name, "dummy");
870
871   stream->pipe = gst_pipeline_new (name);
872   if (!stream->pipe) {
873     GST_ERROR_OBJECT (demux, "failed to create pipeline");
874     return FALSE;
875   }
876   g_free(name);
877
878   /* create dummy sender source */
879   name = g_strdup_printf("%s-%s", stream->name, "dummysrc");
880   stream->urisrc = gst_element_factory_make ("filesrc", name);
881   if (!stream->urisrc) {
882     GST_ERROR_OBJECT (demux,"failed to create filesrc element");
883     return FALSE;
884   }
885   g_free(name);
886   g_object_set (G_OBJECT (stream->urisrc), "location", "/opt/home/root/sound_2sec.264", NULL);
887
888   /* create appsink element */
889   name = g_strdup_printf("%s-%s", stream->name, "parser");
890   stream->parser= gst_element_factory_make ("legacyh264parse", name);
891   if (!stream->parser) {
892     GST_ERROR_OBJECT (demux, "failed to create h264 parse element");
893     return FALSE;
894   }
895   g_object_set (G_OBJECT (stream->parser), "output-format", 1, NULL);
896
897   /* create appsink element */
898   name = g_strdup_printf("%s-%s", stream->name, "sink");
899   stream->sink = gst_element_factory_make ("appsink", name);
900   if (!stream->sink) {
901     GST_ERROR_OBJECT (demux, "failed to create appsink element");
902     return FALSE;
903   }
904   g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
905
906   caps = gst_caps_new_simple ("video/x-h264",
907                   "width", G_TYPE_INT, 640,
908                   "height", G_TYPE_INT, 480,
909                   "stream-format", G_TYPE_STRING, "byte-stream",
910                   NULL);
911   g_object_set (G_OBJECT (stream->sink), "caps", caps, NULL);
912
913   g_signal_connect (stream->sink, "new-buffer",  G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
914   g_free(name);
915
916   /* add to pipeline & link all elements */
917   gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
918   if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
919     GST_ERROR_OBJECT (demux,"failed to link elements...");
920     return FALSE;
921   }
922
923   bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
924   gst_bus_add_watch (bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
925   gst_object_unref (bus);
926
927   return TRUE;
928 }
929
930
931 #endif
932 static gboolean
933 gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
934 {
935   GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
936   GstSSDemux *demux = stream->parent;
937
938   switch (GST_MESSAGE_TYPE(msg)) {
939     case GST_MESSAGE_EOS: {
940       guint64 download_rate = -1;
941
942       GST_INFO_OBJECT (stream->pad, "received EOS on download pipe..");
943       // increase the fragment count on EOS
944       stream->frag_cnt++;
945
946       if (g_strrstr (gst_element_get_name(stream->urisrc), "http")) {
947         g_object_get (stream->urisrc, "download-rate", &download_rate, NULL);
948         g_print("*********** '%s' download rate = %d bps **************\n", stream->name, download_rate);
949       }
950
951       // TODO: need to remove download_rate> 0 check.. make it generic
952       if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode != SS_MODE_AONLY) && (download_rate >= 0)) {
953         if (stream->frag_cnt >= demux->fragments_cache) {
954           /* for switching, we are considering video download rate only */
955           demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
956         }
957       } else if (stream->type == SS_STREAM_AUDIO && (demux->ss_mode == SS_MODE_AONLY)) {
958         /* when video is not present using audio download rate to calculate switching */
959          demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
960          if (demux->ss_mode != SS_MODE_AONLY) {
961            g_print ("\n\nMoving to AV mode by audio considering audio download rate\n\n\n\n");
962          }
963       }
964
965       g_cond_signal (stream->cond);
966
967 #ifdef SIMULATE_AUDIO_ONLY
968       /* when fragment count is multiple of 4, switch to audio only case */
969       if ((stream->frag_cnt % 4 == 0) && (stream->type == SS_STREAM_VIDEO) &&
970                 GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser)) {
971         g_print ("\n\t ######## Forcibly switching to audio only for testing ##########\n");
972         demux->ss_mode = SS_MODE_AONLY;
973       }
974   #endif
975       GST_DEBUG_OBJECT (stream->pad, "Signalling eos condition...");
976
977       GST_DEBUG_OBJECT (demux, "number of fragments downloaded = %d", stream->frag_cnt);
978       break;
979     }
980     case GST_MESSAGE_ERROR: {
981       GError *error = NULL;
982       gchar* debug = NULL;
983
984       g_print ("Error from %s\n", gst_element_get_name (GST_MESSAGE_SRC(msg)));
985
986       gst_message_parse_error( msg, &error, &debug );
987       if (error)
988         GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: error= %s\n", error->message);
989
990       GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: debug = %s\n", debug);
991
992       /* handling error, when client requests url, which is yet to be prepared by server */
993       if ((!strncmp(error->message, "Precondition Failed", strlen("Precondition Failed"))) && (5 == error->code)) {
994         GstStateChangeReturn ret;
995
996         /* wait for 1sec & request the url again */
997         // TODO: need to make wait time as generic or Adding loop count to request again & again
998         GST_INFO_OBJECT (demux, "ERROR : code = %d, msg = %s, NEED to request again", error->code, error->message);
999         usleep (1000000); // 1 sec
1000
1001         /* put the current pipeline to NULL state */
1002         gst_element_set_state (stream->pipe, GST_STATE_NULL);
1003         gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
1004         stream->pipe = stream->urisrc = stream->parser = stream->sink = NULL;
1005
1006         g_print ("Going to download fragment AGAIN : %s\n", stream->uri);
1007         if (!gst_ss_demux_create_download_pipe (demux, stream, stream->uri, stream->start_ts)) {
1008           GST_ERROR_OBJECT (demux, "failed to create download pipeline");
1009           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1010             GST_ERROR_OBJECT (demux, "failed to post error");
1011             return FALSE;
1012           }
1013         }
1014
1015         ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
1016         if (ret == GST_STATE_CHANGE_FAILURE) {
1017           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1018             GST_ERROR_OBJECT (demux, "failed to post error");
1019             return FALSE;
1020           }
1021         }
1022
1023         } else {
1024           if (error)
1025           g_print ("GST_MESSAGE_ERROR: error= %s\n", error->message);
1026
1027           g_print ("GST_MESSAGE_ERROR: debug = %s\n", debug);
1028           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1029             GST_ERROR_OBJECT (demux, "failed to post error");
1030             gst_ss_demux_stop (demux, stream);
1031             g_free( debug);
1032             debug = NULL;
1033             g_error_free( error);
1034             return FALSE;
1035         }
1036         gst_ss_demux_stop (demux, stream);
1037       }
1038
1039       g_free( debug);
1040       debug = NULL;
1041       g_error_free( error);
1042       break;
1043     }
1044     case GST_MESSAGE_WARNING: {
1045       char* debug = NULL;
1046       GError* error = NULL;
1047       gst_message_parse_warning(msg, &error, &debug);
1048       GST_WARNING_OBJECT(demux, "warning : %s\n", error->message);
1049       GST_WARNING_OBJECT(demux, "debug : %s\n", debug);
1050       g_error_free( error );
1051       g_free( debug);
1052       break;
1053     }
1054     default : {
1055       GST_LOG_OBJECT(demux, "unhandled message : %s\n", gst_message_type_get_name (GST_MESSAGE_TYPE (msg)));
1056       break;
1057     }
1058   }
1059
1060   return TRUE;
1061 }
1062
1063 static void
1064 gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data)
1065 {
1066   GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
1067   GstSSDemux *demux = stream->parent;
1068   GstBuffer *inbuf = NULL;
1069   GstFlowReturn fret = GST_FLOW_OK;
1070
1071   inbuf = gst_app_sink_pull_buffer ((GstAppSink *)appsink);
1072   if (!inbuf) {
1073     GST_WARNING_OBJECT (demux, "Input buffer not available.,..\n");
1074     return;
1075   }
1076
1077   GST_LOG_OBJECT (stream->pad, "Inbuf : size = %d, ts = %"GST_TIME_FORMAT", dur = %"GST_TIME_FORMAT,
1078       GST_BUFFER_SIZE(inbuf), GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION(inbuf)));
1079
1080   // Queue the buffers till fragment cache reached... after reaching start pushing data to respective port
1081   if ( stream->frag_cnt < demux->fragments_cache ) {
1082     GST_LOG_OBJECT (demux, "queuing data till caching finished...");
1083     g_queue_push_tail (stream->queue, inbuf);
1084     return;
1085   }
1086
1087   if (!stream->sent_ns) {
1088     guint64 duration = GST_CLOCK_TIME_NONE;
1089     guint64 start = GST_CLOCK_TIME_NONE;
1090     GstEvent *event = NULL;
1091
1092     duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(demux->parser), GST_SECOND,
1093                 GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
1094
1095     start = gst_util_uint64_scale (GST_SSM_PARSE_NS_START(demux->parser), GST_SECOND,
1096                 GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
1097
1098     event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME,
1099                                         start, duration, start);
1100
1101     GST_DEBUG_OBJECT(demux," new_segment start = %"GST_TIME_FORMAT, GST_TIME_ARGS(start));
1102
1103     if (!gst_pad_push_event (stream->pad, event)) {
1104       GST_ERROR_OBJECT (demux, "failed to push newsegment event");
1105       return;
1106     }
1107     stream->sent_ns = TRUE;
1108   }
1109
1110   while (!g_queue_is_empty(stream->queue)) {
1111     GstBuffer *cache_buf = g_queue_pop_head (stream->queue);
1112
1113     fret = gst_pad_push (stream->pad, cache_buf);
1114     if (fret != GST_FLOW_OK) {
1115       GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
1116       goto error;
1117     }
1118   }
1119
1120   if (stream->type == SS_STREAM_VIDEO && demux->ss_mode == SS_MODE_AONLY) {
1121     GST_BUFFER_TIMESTAMP (inbuf) = stream->switch_ts;
1122     GST_BUFFER_DURATION (inbuf) = ((float)1/25) * GST_SECOND;
1123     stream->switch_ts = GST_BUFFER_TIMESTAMP (inbuf) + GST_BUFFER_DURATION (inbuf);
1124     g_print ("Dummy buffers ts : %"GST_TIME_FORMAT" and dur : %"GST_TIME_FORMAT"\n",
1125                 GST_TIME_ARGS(GST_BUFFER_TIMESTAMP (inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION (inbuf)));
1126
1127     g_print ("caps : %s\n", gst_caps_to_string(GST_BUFFER_CAPS(inbuf)));
1128   }
1129
1130   /* push data to downstream*/
1131   fret = gst_pad_push (stream->pad, inbuf);
1132   if (fret != GST_FLOW_OK) {
1133     GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
1134     goto error;
1135   }
1136
1137 error:
1138   return;
1139 }
1140
1141 static void
1142 gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream)
1143 {
1144   if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED)
1145     gst_task_stop (stream->stream_task);
1146 }
1147
1148 static void
1149 gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM_TYPE stream_type)
1150 {
1151   stream->cond = g_cond_new ();
1152   stream->lock = g_mutex_new ();
1153   stream->queue = g_queue_new ();
1154   stream->parent = demux;
1155   stream->pipe = NULL;
1156   stream->urisrc = NULL;
1157   stream->parser = NULL;
1158   stream->sink = NULL;
1159   stream->frag_cnt = 0;
1160   stream->type = stream_type ;
1161   stream->uri = NULL;
1162   stream->start_ts = -1;
1163   stream->sent_ns = FALSE;
1164   stream->switch_ts = GST_CLOCK_TIME_NONE;
1165   stream->avg_dur = GST_CLOCK_TIME_NONE;
1166
1167   if (stream->type == SS_STREAM_VIDEO) {
1168     stream->pad = gst_pad_new_from_static_template (&ssdemux_videosrc_template, "video");
1169     stream->name = g_strdup("video");
1170   } else if (stream->type == SS_STREAM_AUDIO) {
1171     stream->pad = gst_pad_new_from_static_template (&ssdemux_audiosrc_template, "audio");
1172     stream->name = g_strdup("audio");
1173   } else if (stream->type == SS_STREAM_TEXT) {
1174     stream->pad = gst_pad_new_from_static_template (&ssdemux_subsrc_template, "subtitle");
1175     stream->name = g_strdup("text");
1176   }
1177
1178   GST_PAD_ELEMENT_PRIVATE (stream->pad) = stream;
1179
1180   gst_pad_use_fixed_caps (stream->pad);
1181   gst_pad_set_event_function (stream->pad, gst_ss_demux_handle_src_event);
1182   gst_pad_set_query_function (stream->pad, gst_ss_demux_handle_src_query);
1183
1184   stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
1185   g_print ("prepare video caps = %s", gst_caps_to_string(stream->caps));
1186
1187   GST_DEBUG_OBJECT (demux, "setting caps %" GST_PTR_FORMAT, stream->caps);
1188   gst_pad_set_caps (stream->pad, stream->caps);
1189
1190   GST_DEBUG_OBJECT (demux, "adding pad %s %p to demux %p", GST_OBJECT_NAME (stream->pad), stream->pad, demux);
1191   gst_pad_set_active (stream->pad, TRUE);
1192   gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->pad);
1193 }
1194
1195 static void
1196 gst_ss_demux_stream_free (GstSSDemux * demux, GstSSDemuxStream * stream)
1197 {
1198   if (stream->queue) {
1199     while (!g_queue_is_empty(stream->queue)) {
1200       gst_buffer_unref (g_queue_pop_head (stream->queue));
1201     }
1202     g_queue_free (stream->queue);
1203     stream->queue = NULL;
1204   }
1205
1206   if (stream->pad) {
1207     gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
1208     stream->pad = NULL;
1209   }
1210   if (stream->cond) {
1211     g_cond_free (stream->cond);
1212     stream->cond = NULL;
1213   }
1214   if (stream->lock) {
1215     g_mutex_free (stream->lock);
1216     stream->lock = NULL;
1217   }
1218
1219   g_free (stream);
1220 }
1221 static gboolean
1222 ssdemux_init (GstPlugin * plugin)
1223 {
1224   if (!gst_element_register (plugin, "ssdemux", GST_RANK_PRIMARY,
1225           GST_TYPE_SS_DEMUX) || FALSE)
1226     return FALSE;
1227   return TRUE;
1228 }
1229
1230 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
1231     GST_VERSION_MINOR,
1232     "ssdemux",
1233     "Smooth streaming demux plugin",
1234     ssdemux_init, VERSION, "LGPL", PACKAGE_NAME, "http://www.samsung.com/")
1235