tizen 2.3.1 release
[framework/multimedia/gst-plugins-ext0.10.git] / ssdemux / src / gstssdemux.c
1
2 #ifdef HAVE_CONFIG_H
3 #  include "config.h"
4 #endif
5
6 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
7  * with newer GLib versions (>= 2.31.0) */
8 #define GLIB_DISABLE_DEPRECATION_WARNINGS
9
10 #include <string.h>
11 //#include <gst/glib-compat-private.h>
12 #include "gstssdemux.h"
13
14 enum
15 {
16   PROP_0,
17   PROP_COOKIES,
18   PROP_ALLOW_AUDIO_ONLY,
19   PROP_CACHE_TIME,
20   PROP_LOW_PERCENTAGE,
21   PROP_HIGH_PERCENTAGE,
22   PROP_BITRATE_SWITCH_TOLERANCE,
23   PROP_LAST
24 };
25
26 static GstStaticPadTemplate ssdemux_videosrc_template =
27 GST_STATIC_PAD_TEMPLATE ("video",
28     GST_PAD_SRC,
29     GST_PAD_SOMETIMES,
30     GST_STATIC_CAPS_ANY);
31
32 static GstStaticPadTemplate ssdemux_audiosrc_template =
33 GST_STATIC_PAD_TEMPLATE ("audio",
34     GST_PAD_SRC,
35     GST_PAD_SOMETIMES,
36     GST_STATIC_CAPS_ANY);
37
38 static GstStaticPadTemplate ssdemux_subsrc_template =
39 GST_STATIC_PAD_TEMPLATE ("subtitle",
40     GST_PAD_SRC,
41     GST_PAD_SOMETIMES,
42     GST_STATIC_CAPS_ANY);
43
44 static GstStaticPadTemplate ssdemux_sink_template =
45 GST_STATIC_PAD_TEMPLATE ("sink",
46     GST_PAD_SINK,
47     GST_PAD_ALWAYS,
48     GST_STATIC_CAPS ("application/x-ss")); // Need to decide source mimetype
49
50 GST_DEBUG_CATEGORY_STATIC (gst_ss_demux_debug);
51 #define GST_CAT_DEFAULT gst_ss_demux_debug
52
53 #undef SIMULATE_AUDIO_ONLY /* enable to simulate audio only case forcibly */
54
55 static void
56 _do_init (GType type)
57 {
58   GST_DEBUG_CATEGORY_INIT (gst_ss_demux_debug, "ssdemux", 0, "ssdemux element");
59 }
60
61 GST_BOILERPLATE_FULL (GstSSDemux, gst_ss_demux, GstElement,
62     GST_TYPE_ELEMENT, _do_init);
63
64 #define DEFAULT_CACHE_TIME 6*GST_SECOND
65 #define DEFAULT_BITRATE_SWITCH_TOLERANCE 0.4
66 #define DEFAULT_LOW_PERCENTAGE 1
67 #define DEFAULT_HIGH_PERCENTAGE 99
68
69 struct _GstSSDemuxStream
70 {
71   /* Streaming task */
72   void *parent;
73   GstPad *pad;
74   gchar *name;
75   SS_STREAM_TYPE type;
76   GstTask *stream_task;
77   GStaticRecMutex stream_lock;
78   GstElement *pipe;
79   GstElement *urisrc;
80   GstElement *parser;
81   GstElement *sink;
82   GstBus *bus;
83   GMutex *lock;
84   GCond *cond;
85   GMutex *queue_lock;
86   GCond *queue_full;
87   GCond *queue_empty;
88   guint frag_cnt;
89   GQueue *queue;
90   gchar *uri;
91   guint64 start_ts;
92   gboolean sent_ns;
93   GstCaps *caps;
94   guint64 switch_ts;
95   guint64 avg_dur;
96   gboolean is_buffering;
97   gint64 percent;
98   gboolean rcvd_percent;
99   guint64 push_block_time;
100   gint64 cached_duration;
101
102   /* for fragment download rate calculation */
103   guint64 download_start_ts;
104   guint64 download_stop_ts;
105   guint64 download_size;
106
107 };
108
109 static void gst_ss_demux_set_property (GObject * object, guint prop_id,
110     const GValue * value, GParamSpec * pspec);
111 static void gst_ss_demux_get_property (GObject * object, guint prop_id,
112         GValue * value, GParamSpec * pspec);
113 static gboolean gst_ss_demux_sink_event (GstPad * pad, GstEvent * event);
114 static GstStateChangeReturn gst_ss_demux_change_state (GstElement * element, GstStateChange transition);
115 static void gst_ss_demux_dispose (GObject * obj);
116 static GstFlowReturn gst_ss_demux_chain (GstPad * pad, GstBuffer * buf);
117 static void gst_ss_demux_stream_loop (GstSSDemux * demux);
118 static gboolean gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data);
119 static void gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM_TYPE stream_type);
120 static void gst_ss_demux_stream_free (GstSSDemux * demux, GstSSDemuxStream * stream);
121 static void gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data);
122 static gboolean gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts);
123 static gboolean gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts);
124 static void gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream);
125 static gboolean gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream);
126 static gboolean gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream);
127 static void gst_ss_demux_push_loop (GstSSDemuxStream *stream);
128 static void gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent);
129
130 static void
131 gst_ss_demux_base_init (gpointer g_class)
132 {
133   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
134
135   gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_videosrc_template));
136   gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_audiosrc_template));
137   gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_subsrc_template));
138   gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_sink_template));
139
140   gst_element_class_set_details_simple (element_class,
141       "SS Demuxer",
142       "Demuxer/URIList",
143       "Smooth Streaming demuxer",
144       "Naveen Cherukuri<naveen.ch@samsung.com>");
145 }
146
147 static void
148 gst_ss_demux_class_init (GstSSDemuxClass * klass)
149 {
150   GObjectClass *gobject_class;
151   GstElementClass *gstelement_class;
152
153   gobject_class = (GObjectClass *) klass;
154   gstelement_class = (GstElementClass *) klass;
155
156   gobject_class->set_property = gst_ss_demux_set_property;
157   gobject_class->get_property = gst_ss_demux_get_property;
158   gobject_class->dispose = gst_ss_demux_dispose;
159
160   /* to share cookies with other sessions */
161   g_object_class_install_property (gobject_class, PROP_COOKIES,
162       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
163           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
164
165   /* will be considered only in LIVE case */
166   g_object_class_install_property (gobject_class, PROP_ALLOW_AUDIO_ONLY,
167       g_param_spec_boolean ("allow-audio-only", "Allow audio only when downloadrate is less in live case",
168           "Allow audio only stream download in live case when download rate is less",
169           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
170
171   g_object_class_install_property (gobject_class, PROP_CACHE_TIME,
172       g_param_spec_uint64 ("max-cache-time", "caching time",
173           "amount of data that can be cached in seconds", 0, G_MAXUINT64,
174           DEFAULT_CACHE_TIME,
175           G_PARAM_READWRITE));
176
177   g_object_class_install_property (gobject_class, PROP_LOW_PERCENTAGE,
178       g_param_spec_int ("low-percent", "Low Percent",
179           "Low threshold to start buffering",
180           1, 100, DEFAULT_LOW_PERCENTAGE,
181           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
182
183   g_object_class_install_property (gobject_class, PROP_HIGH_PERCENTAGE,
184       g_param_spec_int ("high-percent", "High percent",
185           "High threshold to complete buffering",
186           2, 100, DEFAULT_HIGH_PERCENTAGE,
187           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
188
189   g_object_class_install_property (gobject_class, PROP_BITRATE_SWITCH_TOLERANCE,
190       g_param_spec_float ("bitrate-switch-tolerance",
191           "Bitrate switch tolerance",
192           "Tolerance with respect of the fragment duration to switch to "
193           "a different bitrate if the client is too slow/fast.",
194           0, 1, DEFAULT_BITRATE_SWITCH_TOLERANCE,
195           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196
197   gstelement_class->change_state =
198       GST_DEBUG_FUNCPTR (gst_ss_demux_change_state);
199 }
200
201 static void
202 gst_ss_demux_init (GstSSDemux * demux, GstSSDemuxClass * klass)
203 {
204   /* sink pad */
205   demux->sinkpad = gst_pad_new_from_static_template (&ssdemux_sink_template, "sink");
206   gst_pad_set_chain_function (demux->sinkpad,
207       GST_DEBUG_FUNCPTR (gst_ss_demux_chain));
208   gst_pad_set_event_function (demux->sinkpad,
209       GST_DEBUG_FUNCPTR (gst_ss_demux_sink_event));
210   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
211
212   demux->max_cache_time = DEFAULT_CACHE_TIME;
213   demux->cookies = NULL;
214   demux->ss_mode = SS_MODE_NO_SWITCH;
215   demux->switch_eos = FALSE;
216   demux->allow_audio_only = FALSE;
217   demux->percent = 100;
218   demux->low_percent = DEFAULT_LOW_PERCENTAGE;
219   demux->high_percent = DEFAULT_HIGH_PERCENTAGE;
220   demux->eos = FALSE;
221 }
222
223 static void
224 gst_ss_demux_dispose (GObject * obj)
225 {
226   GstSSDemux *demux = GST_SS_DEMUX (obj);
227   int n =0;
228
229   for (n = 0; n < SS_STREAM_NUM; n++) {
230     if (demux->streams[n]) {
231       gst_pad_stop_task ((demux->streams[n])->pad);
232       g_print ("\n\n\nstopped the TASK\n\n\n");
233       gst_ss_demux_stream_free (demux, demux->streams[n]);
234       demux->streams[n] = NULL;
235     }
236   }
237
238   if (demux->parser) {
239     gst_ssm_parse_free (demux->parser);
240     demux->parser = NULL;
241   }
242
243   G_OBJECT_CLASS (parent_class)->dispose (obj);
244 }
245
246 static void
247 gst_ss_demux_set_property (GObject * object, guint prop_id,
248     const GValue * value, GParamSpec * pspec)
249 {
250   GstSSDemux *demux = GST_SS_DEMUX (object);
251
252   switch (prop_id) {
253      case PROP_COOKIES:
254       g_strfreev (demux->cookies);
255       demux->cookies = g_strdupv (g_value_get_boxed (value));
256       break;
257     case PROP_ALLOW_AUDIO_ONLY:
258       demux->allow_audio_only = g_value_get_boolean (value);
259       break;
260     case PROP_CACHE_TIME:
261       demux->max_cache_time = g_value_get_uint64 (value);
262       break;
263     case PROP_LOW_PERCENTAGE:
264       demux->low_percent = g_value_get_int (value);
265       break;
266     case PROP_HIGH_PERCENTAGE:
267       demux->high_percent = g_value_get_int (value);
268       break;
269     case PROP_BITRATE_SWITCH_TOLERANCE:
270       demux->bitrate_switch_tol = g_value_get_float (value);
271       break;
272     default:
273       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
274       break;
275   }
276 }
277
278 static void
279 gst_ss_demux_get_property (GObject * object, guint prop_id, GValue * value,
280     GParamSpec * pspec)
281 {
282   GstSSDemux *demux = GST_SS_DEMUX (object);
283
284   switch (prop_id) {
285     case PROP_COOKIES:
286       g_value_set_boxed (value, g_strdupv (demux->cookies));
287       break;
288     case PROP_ALLOW_AUDIO_ONLY:
289       g_value_set_boolean (value, demux->allow_audio_only);
290       break;
291     case PROP_CACHE_TIME:
292       g_value_set_uint64 (value, demux->max_cache_time);
293       break;
294     case PROP_LOW_PERCENTAGE:
295       g_value_set_int (value, demux->low_percent);
296       break;
297     case PROP_HIGH_PERCENTAGE:
298       g_value_set_int (value, demux->high_percent);
299       break;
300     case PROP_BITRATE_SWITCH_TOLERANCE:
301       g_value_set_float (value, demux->bitrate_switch_tol);
302       break;
303     default:
304       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
305       break;
306   }
307 }
308
309 static gboolean
310 gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
311 {
312   GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
313   GstQuery *query = NULL;
314   gboolean ret;
315   gchar *uri;
316
317   switch (event->type) {
318     case GST_EVENT_EOS: {
319       int i = 0;
320       if (demux->manifest == NULL) {
321         GST_ERROR_OBJECT (demux, "Received EOS without a manifest.");
322         goto error;
323       }
324
325       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: mainifest file fetched");
326
327       query = gst_query_new_uri ();
328       ret = gst_pad_peer_query (demux->sinkpad, query);
329       if (ret) {
330         gst_query_parse_uri (query, &uri);
331         demux->parser = gst_ssm_parse_new (uri);
332         g_free (uri);
333       } else {
334         GST_ERROR_OBJECT (demux, "failed to query URI from upstream");
335         goto error;
336       }
337       gst_query_unref (query);
338       query = NULL;
339
340       GST_LOG_OBJECT (demux, "data = %p & size = %d", GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest));
341       if (!gst_ssm_parse_manifest (demux->parser, (char *)GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest))) {
342         /* In most cases, this will happen if we set a wrong url in the
343          * source element and we have received the 404 HTML response instead of
344          * the playlist */
345         GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."), (NULL));
346         goto error;
347       }
348
349       {
350         unsigned char *protection_data = NULL;
351         unsigned int protection_len = 0;
352
353         /* get protection-header from manifest parser */
354         ret = gst_ssm_parse_get_protection_header (demux->parser, &protection_data, &protection_len);
355         if (!ret) {
356           GST_ERROR_OBJECT (demux, "failed to get protection header...");
357           GST_ELEMENT_ERROR (demux, RESOURCE, NO_SPACE_LEFT, ("fragment allocation failed..."), (NULL));
358           goto error;
359         }
360
361         if (protection_data && protection_len) {
362           g_print ("Got the protection header...\n");
363           demux->protection_header = gst_buffer_new ();
364           GST_BUFFER_DATA (demux->protection_header) = GST_BUFFER_MALLOCDATA (demux->protection_header) = protection_data;
365           GST_BUFFER_SIZE (demux->protection_header) = protection_len;
366         }
367       }
368
369       for( i = 0; i < SS_STREAM_NUM; i++) {
370         if (gst_ssm_parse_check_stream (demux->parser, i)) {
371           GstSSDemuxStream *stream = g_new0 (GstSSDemuxStream, 1);
372
373           // Add pad emission of the stream
374           gst_ss_demux_stream_init (demux, stream, i);
375
376           if (!gst_pad_is_linked (stream->pad)) {
377             GST_WARNING_OBJECT (demux, "%s - stream pad is not linked...clean up", ssm_parse_get_stream_name(i));
378             gst_ss_demux_stream_free (demux, stream);
379             continue;
380           }
381
382           /* create stream task */
383           g_static_rec_mutex_init (&stream->stream_lock);
384           stream->stream_task = gst_task_create ((GstTaskFunction) gst_ss_demux_stream_loop, demux);
385           if (NULL == stream->stream_task) {
386             GST_ERROR_OBJECT (demux, "failed to create stream task...");
387             GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create stream task"), (NULL));
388             goto error;
389           }
390           gst_task_set_lock (stream->stream_task, &stream->stream_lock);
391
392           /* create stream push loop */
393           if (!gst_pad_start_task (stream->pad, (GstTaskFunction) gst_ss_demux_push_loop, stream)) {
394             GST_ERROR_OBJECT (demux, "failed to create push loop...");
395             GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create push loop"), (NULL));
396             goto error;
397           }
398
399           demux->streams[i] = stream;
400           g_print ("Starting stream - %d task loop...\n", i);
401           gst_task_start (stream->stream_task);
402         }
403       }
404
405       gst_event_unref (event);
406       gst_object_unref (demux);
407       return TRUE;
408     }
409     case GST_EVENT_NEWSEGMENT:
410       /* Swallow newsegments, we'll push our own */
411       gst_event_unref (event);
412       gst_object_unref (demux);
413       return TRUE;
414     default:
415       break;
416   }
417
418   return gst_pad_event_default (pad, event);
419
420 error:
421   // TODO: add closing
422
423   //gst_ss_demux_stop (demux);
424   gst_event_unref (event);
425   gst_object_unref (demux);
426
427   if (query)
428     gst_query_unref (query);
429
430   g_print ("Returning from sink event...\n");
431   return FALSE;
432
433 }
434
435 static gboolean
436 gst_ss_demux_handle_src_query (GstPad * pad, GstQuery * query)
437 {
438   gboolean res = FALSE;
439   GstSSDemux *ssdemux = GST_SS_DEMUX (gst_pad_get_parent (pad));
440
441   GST_LOG_OBJECT (pad, "%s query", GST_QUERY_TYPE_NAME (query));
442
443   // TODO: need to add other query types as well
444
445   switch (GST_QUERY_TYPE (query)) {
446     case GST_QUERY_DURATION:{
447       GstFormat fmt;
448
449       gst_query_parse_duration (query, &fmt, NULL);
450       if (fmt == GST_FORMAT_TIME) {
451         gint64 duration = -1;
452
453         duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(ssdemux->parser), GST_SECOND,
454                 GST_SSM_PARSE_GET_TIMESCALE(ssdemux->parser));
455         if (duration > 0) {
456           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
457           res = TRUE;
458         }
459       }
460       break;
461     }
462     default:
463       res = gst_pad_query_default (pad, query);
464       break;
465   }
466
467   gst_object_unref (ssdemux);
468
469   return res;
470 }
471
472
473 static gboolean
474 gst_ss_demux_handle_src_event (GstPad * pad, GstEvent * event)
475 {
476   GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
477
478   switch (event->type) {
479     case GST_EVENT_SEEK:
480     {
481       gdouble rate;
482       GstFormat format;
483       GstSeekFlags flags;
484       GstSeekType start_type, stop_type;
485       gint64 start, stop;
486       gint i = 0;
487       GstSSDemuxStream *stream = NULL;
488
489       GST_INFO_OBJECT (demux, "Received GST_EVENT_SEEK");
490
491       // TODO: should be able to seek in DVR window
492       if (GST_SSM_PARSE_IS_LIVE_PRESENTATION (demux->parser)) {
493         GST_WARNING_OBJECT (demux, "Received seek event for live stream");
494         return FALSE;
495       }
496
497       gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
498           &stop_type, &stop);
499
500       if (format != GST_FORMAT_TIME) {
501         GST_WARNING_OBJECT (demux, "Only time format is supported in seek");
502         return FALSE;
503       }
504
505       GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT
506           " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
507           GST_TIME_ARGS (stop));
508
509
510       for( i = 0; i < SS_STREAM_NUM; i++) {
511         if (stream = demux->streams[i]) {
512           g_cond_signal (stream->cond);
513           gst_task_stop (stream->stream_task);
514         }
515       }
516
517       if (flags & GST_SEEK_FLAG_FLUSH) {
518         GST_INFO_OBJECT (demux, "sending flush start");
519
520         for( i = 0; i < SS_STREAM_NUM; i++) {
521           if (stream = demux->streams[i]) {
522             gst_pad_push_event (stream->pad, gst_event_new_flush_start ());
523           }
524         }
525       }
526
527       gst_ssm_parse_seek_manifest (demux->parser, start);
528
529       if (flags & GST_SEEK_FLAG_FLUSH) {
530         GST_INFO_OBJECT (demux, "sending flush stop");
531         for( i = 0; i < SS_STREAM_NUM; i++) {
532           if (stream = demux->streams[i]) {
533             gst_pad_push_event (stream->pad, gst_event_new_flush_stop ());
534             GST_LOG_OBJECT (stream->pad, "Starting pad TASK again...\n");
535             stream->sent_ns = FALSE;
536             stream->frag_cnt = 0; /*resetting to start buffering on SEEK */
537             gst_task_start (stream->stream_task);
538           }
539         }
540       }
541
542       return TRUE;
543     }
544     default:
545       break;
546   }
547
548   return gst_pad_event_default (pad, event);
549 }
550
551 static GstStateChangeReturn
552 gst_ss_demux_change_state (GstElement * element, GstStateChange transition)
553 {
554   GstStateChangeReturn ret;
555
556   switch (transition) {
557     case GST_STATE_CHANGE_READY_TO_PAUSED:
558       break;
559     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
560       break;
561     default:
562       break;
563   }
564
565   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
566
567   switch (transition) {
568     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
569       break;
570     case GST_STATE_CHANGE_PAUSED_TO_READY:
571       break;
572     default:
573       break;
574   }
575   return ret;
576 }
577
578
579 static GstFlowReturn
580 gst_ss_demux_chain (GstPad * pad, GstBuffer * buf)
581 {
582   GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
583
584   if (demux->manifest == NULL)
585     demux->manifest = buf;
586   else
587     demux->manifest = gst_buffer_join (demux->manifest, buf);
588   gst_object_unref (demux);
589
590   return GST_FLOW_OK;
591 }
592
593
594 static gboolean
595 gst_ss_demux_get_next_fragment (GstSSDemux * demux, SS_STREAM_TYPE stream_type)
596 {
597   GstSSDemuxStream *stream = demux->streams[stream_type];
598   gchar *next_fragment_uri = NULL;
599   guint64 start_ts = 0;
600
601   if (!gst_ssm_parse_get_next_fragment_url (demux->parser, stream_type, &next_fragment_uri, &start_ts )) {
602     GST_INFO_OBJECT (demux, "This Manifest does not contain more fragments");
603     goto end_of_list;
604   }
605
606   GST_ERROR_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
607
608   stream->uri = g_strdup(next_fragment_uri);
609   stream->start_ts = start_ts;
610
611   if (!gst_ss_demux_download_fragment (demux, stream, next_fragment_uri, start_ts)) {
612     GST_ERROR_OBJECT (demux, "failed to download fragment...");
613     goto error;
614   }
615
616   return TRUE;
617
618 error:
619   {
620     GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to download fragment"), (NULL));
621     gst_ss_demux_stop (demux, stream);
622     return FALSE;
623   }
624 end_of_list:
625   {
626     GST_INFO_OBJECT (demux, "Reached end of playlist, sending EOS");
627     demux->eos = TRUE;
628     gst_ss_demux_stop (demux, stream);
629     return TRUE;
630   }
631 }
632
633 static void
634 gst_ss_demux_push_loop (GstSSDemuxStream *stream)
635 {
636   GstBuffer *outbuf = NULL;
637   GstSSDemux *demux = stream->parent;
638   GstFlowReturn fret = GST_FLOW_OK;
639
640   // TODO: need to take care of EOS handling....
641
642   g_mutex_lock (stream->queue_lock);
643
644   if (g_queue_is_empty (stream->queue)) {
645     GST_DEBUG_OBJECT (stream->pad,"queue is empty wait till, some buffers are available...");
646     if (demux->eos) {
647       GST_INFO_OBJECT (stream->pad, "stream EOS, pause the task");
648       gst_pad_push_event (stream->pad, gst_event_new_eos ());
649       gst_pad_pause_task (stream->pad);
650       g_print ("Paused the task");
651       return;
652     }
653     g_cond_wait (stream->queue_empty, stream->queue_lock);
654   }
655
656   outbuf = g_queue_pop_head (stream->queue);
657
658   if (GST_BUFFER_DURATION_IS_VALID (outbuf)) {
659     stream->cached_duration -= GST_BUFFER_DURATION(outbuf);
660   } else {
661     g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
662     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
663     g_mutex_unlock (stream->queue_lock);
664     return;
665   }
666
667   g_cond_signal (stream->queue_full);
668   //g_print ("[%s] Signalled full condition...\n", ssm_parse_get_stream_name(stream->type));
669   g_mutex_unlock (stream->queue_lock);
670
671   if (!stream->sent_ns) {
672     guint64 duration = GST_CLOCK_TIME_NONE;
673     guint64 start = GST_CLOCK_TIME_NONE;
674     GstEvent *event = NULL;
675
676     duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(demux->parser), GST_SECOND,
677                                       GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
678
679     start = gst_util_uint64_scale (GST_SSM_PARSE_NS_START(demux->parser), GST_SECOND,
680                                    GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
681
682     event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, start, duration, start);
683
684     GST_DEBUG_OBJECT(demux," new_segment start = %"GST_TIME_FORMAT, GST_TIME_ARGS(start));
685
686     if (!gst_pad_push_event (stream->pad, event)) {
687       GST_ERROR_OBJECT (demux, "failed to push newsegment event");
688       return; // No need to close task for this, because sometimes pad can unlined
689     }
690     stream->sent_ns = TRUE;
691   }
692
693   if (stream->type == SS_STREAM_VIDEO && demux->ss_mode == SS_MODE_AONLY) {
694     GST_BUFFER_TIMESTAMP (outbuf) = stream->switch_ts;
695     GST_BUFFER_DURATION (outbuf) = ((float)1/25) * GST_SECOND;
696     stream->switch_ts = GST_BUFFER_TIMESTAMP (outbuf) + GST_BUFFER_DURATION (outbuf);
697     g_print ("Dummy buffers ts : %"GST_TIME_FORMAT" and dur : %"GST_TIME_FORMAT"\n",
698              GST_TIME_ARGS(GST_BUFFER_TIMESTAMP (outbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION (outbuf)));
699     gchar *caps_string = gst_caps_to_string(GST_BUFFER_CAPS(outbuf));
700     g_print ("caps : %s\n", caps_string);
701     g_free(caps_string);
702     caps_string = NULL;
703   }
704
705   /* push data to downstream*/
706   fret = gst_pad_push (stream->pad, outbuf);
707   if (fret != GST_FLOW_OK) {
708     GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
709     goto error;
710   }
711
712   //g_print ("[%s] pushed buffer\n", ssm_parse_get_stream_name(stream->type));
713 error:
714   // TODO: need to close task & post error to bus
715   return;
716 }
717
718 static void
719 gst_ss_demux_stream_loop (GstSSDemux * demux)
720 {
721   GThread *self = NULL;
722   int stream_type = 0;
723   GstSSDemuxStream *stream = NULL;
724
725   self = g_thread_self ();
726
727   for (stream_type = 0; stream_type < SS_STREAM_NUM; stream_type++) {
728     if (demux->streams[stream_type] && demux->streams[stream_type]->stream_task->abidata.ABI.thread == self) {
729       stream = demux->streams[stream_type];
730       break;
731     }
732   }
733
734   if (stream) {
735     /* download next fragment of stream_type */
736     if (!gst_ss_demux_get_next_fragment (demux, stream_type)) {
737       GST_ERROR_OBJECT (demux, "failed to get next fragment...");
738       goto error;
739     }
740   }
741
742   return;
743
744 error:
745   {
746     gst_task_pause (stream->stream_task);
747     GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
748           ("could not download fragments"), (NULL));
749       gst_ss_demux_stop (demux, stream);
750     return;
751   }
752 }
753
754 static gboolean
755 gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
756 {
757   GstStateChangeReturn ret;
758   GTimeVal time = {0, };
759
760   g_print ("Going to download fragment : %s\n", uri);
761   if (!gst_ss_demux_create_download_pipe (demux, stream, uri, start_ts)) {
762     GST_ERROR_OBJECT (demux, "failed to create download pipeline");
763     return FALSE;
764   }
765
766   /* download rate calculation : note down start time*/
767   g_get_current_time (&time);
768   stream->download_start_ts = (time.tv_sec * 1000000)+ time.tv_usec;
769
770   ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
771   if (ret == GST_STATE_CHANGE_FAILURE) {
772     GST_ERROR_OBJECT (demux, "set_state failed...");
773     return FALSE;
774   }
775
776   if (stream->pipe && demux->ss_mode == SS_MODE_AONLY &&
777     stream->type == SS_STREAM_VIDEO) {
778
779     GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI");
780     g_mutex_lock (stream->lock);
781     g_cond_wait (stream->cond, stream->lock);
782     GST_INFO_OBJECT (stream->pad, "Recived signal to shutdown...");
783     g_mutex_unlock (stream->lock);
784
785     /* put live pipeline to PAUSED state to unlink urisrc & piffdemux */
786     gst_element_set_state (stream->pipe, GST_STATE_NULL);
787     gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
788
789     stream->pipe = NULL;
790
791     stream->switch_ts = stream->start_ts;
792
793     /* create dummy frame sender */
794     if (!gst_ss_demux_create_dummy_sender (demux, stream)) {
795       GST_ERROR_OBJECT (demux, "failed to create dummy sender pipeline...");
796       GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("Unable to create dummy pipe."), (NULL));
797       return FALSE;
798     }
799   }
800
801   /* wait until:
802    *   - the download succeed (EOS)
803    *   - the download failed (Error message on the fetcher bus)
804    *   - the download was canceled
805    */
806   GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI");
807   g_mutex_lock (stream->lock);
808   g_cond_wait (stream->cond, stream->lock);
809   GST_INFO_OBJECT (stream->pad, "Recived signal to shutdown...");
810   g_mutex_unlock (stream->lock);
811
812   gst_element_set_state (stream->pipe, GST_STATE_NULL);
813   gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
814   stream->pipe = NULL;
815
816   return TRUE;
817 }
818
819 static gboolean
820 gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream)
821 {
822   GstStateChangeReturn ret;
823
824   if (!gst_ss_demux_create_dummy_pipe (demux, stream)) {
825     GST_ERROR_OBJECT (demux, "failed to create download pipeline");
826     return FALSE;
827   }
828
829   ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
830   if (ret == GST_STATE_CHANGE_FAILURE) {
831     GST_ERROR_OBJECT (demux, "set_state failed...");
832     return FALSE;
833   }
834
835 #if 0
836   GST_DEBUG_OBJECT (demux, "Waiting to download next video URI");
837   g_mutex_lock (stream->lock);
838   g_cond_wait (stream->cond, stream->lock);
839   if (stream->pipe) {
840     gst_element_set_state (stream->pipe, GST_STATE_NULL);
841     gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
842     stream->pipe = NULL;
843   }
844   g_mutex_unlock (stream->lock);
845 #endif
846
847   return TRUE;
848 }
849
850 static void
851 gst_ss_demux_append_live_params(GstElement *piffparser, piff_live_param_t *param, gpointer data)
852 {
853   GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
854   GstSSDemux *demux = stream->parent;
855   int i =0;
856   guint64 timestamp = 0;
857   guint64 duration = 0;
858
859   GST_LOG_OBJECT (demux, "received signal structs count = %d\n", param->count);
860
861   for (i = 0 ; i< param->count; i++) {
862     if (param->long_info) {
863       piff_fragment_longtime_info *info = &(param->long_info[i]);
864       timestamp = info->ts;
865       duration = info->duration;
866     } else if (param->info) {
867       piff_fragment_time_info *info = &(param->info[i]);
868       timestamp = info->ts;
869       duration = info->duration;
870     }
871
872     GST_LOG_OBJECT (demux, "Received ts = %llu and dur = %llu\n", timestamp, duration);
873
874     if (!gst_ssm_parse_append_next_fragment (demux->parser, stream->type, timestamp, duration)) {
875       GST_ERROR_OBJECT (demux, "failed to append new fragment");
876       GST_ELEMENT_ERROR (demux, RESOURCE, NO_SPACE_LEFT, ("fragment allocation failed..."), (NULL));
877       return;
878     }
879   }
880
881   if (param->long_info) {
882     free (param->long_info);
883     param->long_info = NULL;
884   }
885
886   if (param->info) {
887     free (param->info);
888     param->info = NULL;
889   }
890
891   free (param);
892
893   if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode == SS_MODE_AONLY)) {
894     g_print ("\n\n\t\tSignalling download pipe shutdonw....\n\n");
895
896     g_object_get (stream->parser, "frame-dur", &stream->avg_dur, NULL);
897     g_print ("frame duration = %"GST_TIME_FORMAT"\n\n\n", GST_TIME_ARGS(stream->avg_dur));
898     g_cond_signal (stream->cond);
899   }
900
901 }
902
903 static gboolean
904 gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
905 {
906   gchar *name = NULL;
907   gchar *caps_string = NULL;
908
909   if (!gst_uri_is_valid (uri))
910     return FALSE;
911
912   name = g_strdup_printf("%s-%s", stream->name, "downloader");
913
914   stream->pipe = gst_pipeline_new (name);
915   if (!stream->pipe) {
916     GST_ERROR_OBJECT (demux, "failed to create pipeline");
917     g_free(name);
918     name = NULL;
919     return FALSE;
920   }
921
922   name = g_strdup_printf("%s-%s", stream->name, "httpsrc");
923   GST_DEBUG ("Creating source element for the URI:%s", uri);
924   stream->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, name);
925   if (!stream->urisrc) {
926     GST_ERROR_OBJECT (demux, "failed to create urisrc");
927     g_free(name);
928     return FALSE;
929   }
930
931   if (GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser))
932     g_object_set (G_OBJECT (stream->urisrc), "is-live", TRUE, NULL);
933   else
934     g_object_set (G_OBJECT (stream->urisrc), "is-live", FALSE, NULL);
935
936   name = g_strdup_printf("%s-%s", stream->name, "parser");
937   stream->parser = gst_element_factory_make ("piffdemux", name);
938   if (!stream->parser) {
939     GST_ERROR_OBJECT (demux, "failed to create piffdemux element");
940     g_free(name);
941     name = NULL;
942     return FALSE;
943   }
944
945   if (stream->caps)
946     gst_caps_unref (stream->caps);
947
948   stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
949   caps_string = gst_caps_to_string(stream->caps);
950   GST_INFO_OBJECT (stream->pad, "prepare caps = %s", caps_string);
951   g_free(caps_string);
952   caps_string = NULL;
953
954   g_object_set (G_OBJECT (stream->parser), "caps", stream->caps, NULL);
955   g_object_set (G_OBJECT (stream->parser), "start-ts", start_ts, NULL);
956   g_object_set (G_OBJECT (stream->parser), "duration", GST_SSM_PARSE_GET_DURATION(demux->parser), NULL);
957   g_object_set (G_OBJECT (stream->parser), "is-live", GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser), NULL);
958   g_object_set (G_OBJECT (stream->parser), "lookahead-count", GST_SSM_PARSE_LOOKAHEAD_COUNT(demux->parser), NULL);
959   if (demux->protection_header)
960     g_object_set (G_OBJECT (stream->parser), "protection-header", demux->protection_header, NULL);
961   g_signal_connect (stream->parser, "live-param",  G_CALLBACK (gst_ss_demux_append_live_params), stream);
962
963   name = g_strdup_printf("%s-%s", stream->name, "sink");
964   stream->sink = gst_element_factory_make ("appsink", name);
965   if (!stream->sink) {
966     GST_ERROR_OBJECT (demux, "failed to create appsink element");
967     g_free(name);
968     name = NULL;
969     return FALSE;
970   }
971   g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
972   g_signal_connect (stream->sink, "new-buffer",  G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
973
974   gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
975   if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
976     GST_ERROR ("failed to link elements...");
977     return FALSE;
978   }
979
980   stream->bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
981   gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
982   gst_object_unref (stream->bus);
983
984   g_free(name);
985   name = NULL;
986
987   return TRUE;
988 }
989
990 #if 0
991 static gboolean
992 gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream)
993 {
994   gchar *name = NULL;
995   GstCaps *caps = NULL;
996   GstElement *capsfilter = NULL;
997   GstElement *enc = NULL;
998   guint64 avg_dur = -1;
999   guint frame_rate = 0;
1000
1001   name = g_strdup_printf("%s-%s", stream->name, "dummy");
1002
1003   stream->pipe = gst_pipeline_new (name);
1004   if (!stream->pipe) {
1005     GST_ERROR_OBJECT (demux, "failed to create pipeline");
1006     return FALSE;
1007   }
1008   g_free(name);
1009
1010   /* create dummy sender source */
1011   name = g_strdup_printf("%s-%s", stream->name, "dummysrc");
1012   stream->urisrc = gst_element_factory_make ("imagereader", name);
1013   if (!stream->urisrc) {
1014     GST_ERROR_OBJECT (demux,"failed to create filesrc element");
1015     return FALSE;
1016   }
1017   g_free(name);
1018   g_object_set (G_OBJECT (stream->urisrc), "location", "/opt/home/root/aonly_VGA_1frame_I420.yuv", NULL);
1019   g_object_set (G_OBJECT (stream->urisrc), "framerate", 25, NULL);
1020   g_object_set (G_OBJECT (stream->urisrc), "num-buffers", 60, NULL);
1021
1022   /* caps filter */
1023   capsfilter = gst_element_factory_make ("capsfilter", NULL);
1024   if (!capsfilter) {
1025     GST_ERROR_OBJECT (demux, "failed to create capsfilter element");
1026     return FALSE;
1027   }
1028   caps = gst_caps_new_simple ("video/x-raw-yuv",
1029                   "width", G_TYPE_INT, 640,
1030                   "height", G_TYPE_INT, 480,
1031                   "framerate",GST_TYPE_FRACTION, 25,1,
1032                   "format", GST_TYPE_FOURCC, GST_MAKE_FOURCC ('I', '4', '2', '0'),
1033                   NULL);
1034   g_object_set (G_OBJECT (capsfilter), "caps", caps,  NULL);
1035
1036   /* create h264parse element */
1037   enc = gst_element_factory_make ("savsenc_h264", "H264 encoder");
1038   if (!enc) {
1039     GST_ERROR_OBJECT (demux, "failed to create h264 parse element");
1040     return FALSE;
1041   }
1042   name = g_strdup_printf("%s-%s", stream->name, "sink");
1043   stream->sink = gst_element_factory_make ("appsink", name);
1044   if (!stream->sink) {
1045     GST_ERROR_OBJECT (demux, "failed to create appsink element");
1046     return FALSE;
1047   }
1048   g_free(name);
1049   g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
1050   g_signal_connect (stream->sink, "new-buffer",  G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
1051
1052   /* add to pipeline & link all elements */
1053   gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, capsfilter, enc, stream->sink, NULL);
1054
1055   if (!gst_element_link_many (stream->urisrc, capsfilter, enc, stream->sink, NULL)) {
1056     GST_ERROR_OBJECT (demux,"failed to link dummy pipe elements...");
1057     return FALSE;
1058   }
1059
1060   stream->bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
1061   gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
1062   gst_object_unref (stream->bus);
1063
1064   return TRUE;
1065 }
1066 #else
1067 static gboolean
1068 gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream)
1069 {
1070   gchar *name = NULL;
1071   GstBus *bus = NULL;
1072   GstCaps *caps = NULL;
1073
1074   name = g_strdup_printf("%s-%s", stream->name, "dummy");
1075
1076   stream->pipe = gst_pipeline_new (name);
1077   if (!stream->pipe) {
1078     GST_ERROR_OBJECT (demux, "failed to create pipeline");
1079     return FALSE;
1080   }
1081   g_free(name);
1082
1083   /* create dummy sender source */
1084   name = g_strdup_printf("%s-%s", stream->name, "dummysrc");
1085   stream->urisrc = gst_element_factory_make ("filesrc", name);
1086   if (!stream->urisrc) {
1087     GST_ERROR_OBJECT (demux,"failed to create filesrc element");
1088     return FALSE;
1089   }
1090   g_free(name);
1091   g_object_set (G_OBJECT (stream->urisrc), "location", "/opt/home/root/sound_2sec.264", NULL);
1092
1093   /* create appsink element */
1094   name = g_strdup_printf("%s-%s", stream->name, "parser");
1095   stream->parser= gst_element_factory_make ("legacyh264parse", name);
1096   if (!stream->parser) {
1097     GST_ERROR_OBJECT (demux, "failed to create h264 parse element");
1098     return FALSE;
1099   }
1100   g_object_set (G_OBJECT (stream->parser), "output-format", 1, NULL);
1101
1102   /* create appsink element */
1103   name = g_strdup_printf("%s-%s", stream->name, "sink");
1104   stream->sink = gst_element_factory_make ("appsink", name);
1105   if (!stream->sink) {
1106     GST_ERROR_OBJECT (demux, "failed to create appsink element");
1107     return FALSE;
1108   }
1109   g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
1110
1111   caps = gst_caps_new_simple ("video/x-h264",
1112                   "width", G_TYPE_INT, 640,
1113                   "height", G_TYPE_INT, 480,
1114                   "stream-format", G_TYPE_STRING, "byte-stream",
1115                   NULL);
1116   g_object_set (G_OBJECT (stream->sink), "caps", caps, NULL);
1117
1118   g_signal_connect (stream->sink, "new-buffer",  G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
1119   g_free(name);
1120
1121   /* add to pipeline & link all elements */
1122   gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
1123   if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
1124     GST_ERROR_OBJECT (demux,"failed to link elements...");
1125     return FALSE;
1126   }
1127
1128   bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
1129   gst_bus_add_watch (bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
1130   gst_object_unref (bus);
1131
1132   return TRUE;
1133 }
1134
1135
1136 #endif
1137 static gboolean
1138 gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
1139 {
1140   GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
1141   GstSSDemux *demux = stream->parent;
1142
1143   switch (GST_MESSAGE_TYPE(msg)) {
1144     case GST_MESSAGE_EOS: {
1145       GTimeVal time = {0, };
1146       gint idx = 0;
1147       guint64 total_push_time = 0;
1148       guint64 download_rate = 0;
1149
1150       GST_INFO_OBJECT (stream->pad, "received EOS on download pipe..");
1151       // increase the fragment count on EOS
1152       stream->frag_cnt++;
1153
1154       /* download rate calculation : note down start time*/
1155       g_get_current_time (&time);
1156       stream->download_stop_ts = (time.tv_sec * 1000000)+ time.tv_usec;
1157
1158       download_rate = ((stream->download_size * 8 * 1000000) / (stream->download_stop_ts - stream->download_start_ts - stream->push_block_time));
1159       g_print("*********** '%s' download rate = %"G_GUINT64_FORMAT" bpssss **************\n", stream->name, download_rate);
1160       stream->download_size = 0;
1161       stream->download_stop_ts = stream->download_start_ts = 0;
1162       stream->push_block_time = 0;
1163
1164       if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode != SS_MODE_AONLY)) {
1165         if (!stream->is_buffering) {
1166           /* for switching, we are considering video download rate only */
1167           demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
1168         }
1169       } else if (stream->type == SS_STREAM_AUDIO && (demux->ss_mode == SS_MODE_AONLY)) {
1170         /* when video is not present using audio download rate to calculate switching */
1171          demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
1172          if (demux->ss_mode != SS_MODE_AONLY) {
1173            g_print ("\n\nMoving to AV mode by audio considering audio download rate\n\n\n\n");
1174          }
1175       }
1176
1177       g_cond_signal (stream->cond);
1178
1179 #ifdef SIMULATE_AUDIO_ONLY
1180       /* when fragment count is multiple of 4, switch to audio only case */
1181       if ((stream->frag_cnt % 4 == 0) && (stream->type == SS_STREAM_VIDEO) &&
1182                 GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser)) {
1183         g_print ("\n\t ######## Forcibly switching to audio only for testing ##########\n");
1184         demux->ss_mode = SS_MODE_AONLY;
1185       }
1186   #endif
1187       GST_DEBUG_OBJECT (stream->pad, "Signalling eos condition...");
1188
1189       GST_DEBUG_OBJECT (demux, "number of fragments downloaded = %d", stream->frag_cnt);
1190       break;
1191     }
1192     case GST_MESSAGE_ERROR: {
1193       GError *error = NULL;
1194       gchar* debug = NULL;
1195
1196       g_print ("Error from %s\n", gst_element_get_name (GST_MESSAGE_SRC(msg)));
1197
1198       gst_message_parse_error( msg, &error, &debug);
1199       if (error)
1200         GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: error= %s\n", error->message);
1201
1202       GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: debug = %s\n", debug);
1203
1204       /* handling error, when client requests url, which is yet to be prepared by server */
1205       if (GST_IS_URI_HANDLER(GST_MESSAGE_SRC(msg))) {
1206         GstStateChangeReturn ret;
1207
1208         /* wait for 1sec & request the url again */
1209         // TODO: need to make wait time as generic or Adding loop count to request again & again
1210         if (error)
1211           GST_INFO_OBJECT (demux, "ERROR : code = %d, msg = %s, NEED to request again", error->code, error->message);
1212
1213         usleep (1000000); // 1 sec
1214
1215         /* put the current pipeline to NULL state */
1216         gst_element_set_state (stream->pipe, GST_STATE_NULL);
1217         gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
1218         stream->pipe = stream->urisrc = stream->parser = stream->sink = NULL;
1219
1220         g_print ("Going to download fragment AGAIN : %s\n", stream->uri);
1221         if (!gst_ss_demux_create_download_pipe (demux, stream, stream->uri, stream->start_ts)) {
1222           GST_ERROR_OBJECT (demux, "failed to create download pipeline");
1223           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1224             GST_ERROR_OBJECT (demux, "failed to post error");
1225             g_free(debug);
1226             debug = NULL;
1227
1228             return FALSE;
1229           }
1230         }
1231
1232         ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
1233         if (ret == GST_STATE_CHANGE_FAILURE) {
1234           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1235             GST_ERROR_OBJECT (demux, "failed to post error");
1236             return FALSE;
1237           }
1238         }
1239
1240         } else {
1241           if (error)
1242           g_print ("GST_MESSAGE_ERROR: error= %s\n", error->message);
1243
1244           g_print ("GST_MESSAGE_ERROR: debug = %s\n", debug);
1245           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1246             GST_ERROR_OBJECT (demux, "failed to post error");
1247             gst_ss_demux_stop (demux, stream);
1248             g_free(debug);
1249             debug = NULL;
1250             g_error_free(error);
1251             return FALSE;
1252         }
1253         gst_ss_demux_stop (demux, stream);
1254       }
1255
1256       g_free( debug);
1257       debug = NULL;
1258       g_error_free( error);
1259       break;
1260     }
1261     case GST_MESSAGE_BUFFERING: {
1262       int n =0;
1263       int total_cache_perc = 0;
1264       int active_stream_cnt = 0;
1265       GstSSDemuxStream *cur_stream = NULL;
1266       int avg_percent = 0;
1267
1268       /* update buffer percent */
1269       gst_message_parse_buffering (msg, &stream->rcvd_percent);
1270       gchar *name = gst_element_get_name (GST_MESSAGE_SRC (msg));
1271       GST_LOG_OBJECT (stream->pad, "Internal bus : Buffering from %s = %d\n", name, stream->rcvd_percent);
1272       g_free(name);
1273       name = NULL;
1274       // TODO: need to check for better logic
1275       for (n = 0; n < SS_STREAM_NUM; n++) {
1276         cur_stream = demux->streams[n];
1277         if (cur_stream) {
1278           active_stream_cnt++;
1279           total_cache_perc += cur_stream->rcvd_percent;
1280         }
1281       }
1282
1283       avg_percent = total_cache_perc / active_stream_cnt;
1284
1285       GST_LOG_OBJECT (demux, "avg buffering completed = %d", avg_percent);
1286
1287       if (avg_percent > 100)
1288         avg_percent = 100;
1289
1290       // TODO: need to add mutex for protecting percent
1291       if (avg_percent != demux->percent) {
1292         demux->percent = avg_percent;
1293         GST_LOG_OBJECT (demux, "#########Posting %d buffering msg to main bus ###########", demux->percent);
1294
1295         gst_element_post_message (GST_ELEMENT (demux), gst_message_new_buffering (GST_OBJECT (demux), avg_percent));
1296       }
1297     }
1298     break;
1299     case GST_MESSAGE_WARNING: {
1300       char* debug = NULL;
1301       GError* error = NULL;
1302       gst_message_parse_warning(msg, &error, &debug);
1303       GST_WARNING_OBJECT(demux, "warning : %s\n", error->message);
1304       GST_WARNING_OBJECT(demux, "debug : %s\n", debug);
1305       g_error_free( error );
1306       g_free( debug);
1307       break;
1308     }
1309     default : {
1310       GST_LOG_OBJECT(demux, "unhandled message : %s\n", gst_message_type_get_name (GST_MESSAGE_TYPE (msg)));
1311       break;
1312     }
1313   }
1314
1315   return TRUE;
1316 }
1317
1318 static void
1319 gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent)
1320 {
1321   gboolean do_post = FALSE;
1322   GstSSDemux *demux = stream->parent;
1323
1324   if (stream->is_buffering) {
1325     do_post = TRUE;
1326     if (percent >= demux->high_percent)
1327       stream->is_buffering = FALSE;
1328   } else {
1329     if (percent < demux->low_percent) {
1330       stream->is_buffering = TRUE;
1331       do_post = TRUE;
1332     }
1333   }
1334
1335   if (do_post) {
1336     GstMessage *message;
1337     GstBufferingMode mode;
1338     gint64 buffering_left = -1;
1339
1340     percent = percent * 100 / demux->high_percent;
1341
1342     if (percent > 100)
1343       percent = 100;
1344
1345     if (percent != stream->percent) {
1346       stream->percent = percent;
1347
1348       GST_DEBUG_OBJECT (stream->pad, "buffering %d percent", (gint) percent);
1349       g_print ("'%s' buffering %d percent done\n", stream->name, (gint) percent);
1350
1351       /* posting buffering to internal bus, which will take average & post to main bus */
1352       message = gst_message_new_buffering (GST_OBJECT_CAST (stream->sink), (gint) percent);
1353       gst_element_post_message (GST_ELEMENT_CAST (stream->sink), message);
1354     }
1355   }
1356
1357 }
1358
1359 static void
1360 gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data)
1361 {
1362   GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
1363   GstSSDemux *demux = stream->parent;
1364   GstBuffer *inbuf = NULL;
1365   GstFlowReturn fret = GST_FLOW_OK;
1366   GstBuffer *headbuf = NULL;
1367   gint64 diff = 0;
1368   gint64 percent = 0;
1369   GTimeVal start = {0, };
1370   GTimeVal stop = {0, };
1371   guint64 push_start_time = 0;
1372   guint64 push_end_time =0;
1373
1374   inbuf = gst_app_sink_pull_buffer ((GstAppSink *)appsink);
1375   if (!inbuf) {
1376     GST_WARNING_OBJECT (demux, "Input buffer not available.,..\n");
1377     return;
1378   }
1379
1380   g_mutex_lock (stream->queue_lock);
1381
1382   stream->download_size += GST_BUFFER_SIZE(inbuf);
1383
1384   /* download rate calculation : note push_start_ts */
1385   g_get_current_time (&start);
1386   push_start_time = (start.tv_sec * 1000000)+ start.tv_usec;
1387
1388   GST_LOG_OBJECT (stream->pad, "Inbuf : size = %d, ts = %"GST_TIME_FORMAT", dur = %"GST_TIME_FORMAT,
1389       GST_BUFFER_SIZE(inbuf), GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION(inbuf)));
1390
1391   g_queue_push_tail (stream->queue, inbuf);
1392
1393   if (GST_BUFFER_DURATION_IS_VALID (inbuf)) {
1394     stream->cached_duration += GST_BUFFER_DURATION(inbuf);
1395   } else {
1396     g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
1397     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
1398     g_mutex_unlock (stream->queue_lock);
1399     return;
1400   }
1401
1402   if (stream->cached_duration >= 0) {
1403     percent = (stream->cached_duration * 100) / demux->max_cache_time;
1404     //g_print ("[%s] percent done = %d[%"G_GINT64_FORMAT"]\n", ssm_parse_get_stream_name(stream->type), percent, percent);
1405
1406     // TODO: need to decide, whther to call before wait or after ??
1407     gst_ss_demux_update_buffering (stream, percent);
1408
1409     if (percent > 100) {
1410       /* update buffering & wait if space is not available */
1411       GST_DEBUG_OBJECT (stream->pad, "Reached more than 100 percent, queue full & wait till free");
1412       g_cond_wait(stream->queue_full, stream->queue_lock);
1413       GST_DEBUG_OBJECT (stream->pad,"Received signal to add more data...");
1414     }
1415   } else {
1416     g_print ("cached duration can not be negative\n\n\n");
1417     GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid cached duration"), (NULL));
1418     g_mutex_unlock (stream->queue_lock);
1419     return;
1420   }
1421
1422   /* download rate calculation : note push_stop_ts */
1423   g_get_current_time (&stop);
1424   push_end_time = (stop.tv_sec * 1000000)+ stop.tv_usec;
1425
1426   stream->push_block_time += push_end_time - push_start_time;
1427
1428   g_cond_signal (stream->queue_empty);
1429
1430   g_mutex_unlock (stream->queue_lock);
1431   return;
1432 }
1433
1434 static void
1435 gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream)
1436 {
1437   if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED)
1438     gst_task_stop (stream->stream_task);
1439 }
1440
1441 static void
1442 gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM_TYPE stream_type)
1443 {
1444   stream->cond = g_cond_new ();
1445   stream->lock = g_mutex_new ();
1446   stream->queue = g_queue_new ();
1447   stream->queue_full = g_cond_new ();
1448   stream->queue_empty = g_cond_new ();
1449   stream->queue_lock = g_mutex_new ();
1450   stream->parent = demux;
1451   stream->pipe = NULL;
1452   stream->urisrc = NULL;
1453   stream->parser = NULL;
1454   stream->sink = NULL;
1455   stream->frag_cnt = 0;
1456   stream->type = stream_type ;
1457   stream->uri = NULL;
1458   stream->start_ts = -1;
1459   stream->sent_ns = FALSE;
1460   stream->switch_ts = GST_CLOCK_TIME_NONE;
1461   stream->avg_dur = GST_CLOCK_TIME_NONE;
1462   stream->percent = 100;
1463   stream->rcvd_percent = 0;
1464   stream->push_block_time = 0;
1465   stream->cached_duration = 0;
1466   stream->download_start_ts = 0;
1467   stream->download_stop_ts = 0;
1468   stream->download_size = 0;
1469
1470   if (stream->type == SS_STREAM_VIDEO) {
1471     stream->pad = gst_pad_new_from_static_template (&ssdemux_videosrc_template, "video");
1472     stream->name = g_strdup("video");
1473   } else if (stream->type == SS_STREAM_AUDIO) {
1474     stream->pad = gst_pad_new_from_static_template (&ssdemux_audiosrc_template, "audio");
1475     stream->name = g_strdup("audio");
1476   } else if (stream->type == SS_STREAM_TEXT) {
1477     stream->pad = gst_pad_new_from_static_template (&ssdemux_subsrc_template, "subtitle");
1478     stream->name = g_strdup("text");
1479   }
1480
1481   GST_PAD_ELEMENT_PRIVATE (stream->pad) = stream;
1482
1483   gst_pad_use_fixed_caps (stream->pad);
1484   gst_pad_set_event_function (stream->pad, gst_ss_demux_handle_src_event);
1485   gst_pad_set_query_function (stream->pad, gst_ss_demux_handle_src_query);
1486
1487   stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
1488   gchar *caps_name = gst_caps_to_string(stream->caps);
1489   g_print ("prepare video caps = %s", caps_name);
1490   g_free(caps_name);
1491
1492   GST_DEBUG_OBJECT (demux, "setting caps %" GST_PTR_FORMAT, stream->caps);
1493   gst_pad_set_caps (stream->pad, stream->caps);
1494
1495   GST_DEBUG_OBJECT (demux, "adding pad %s %p to demux %p", GST_OBJECT_NAME (stream->pad), stream->pad, demux);
1496   gst_pad_set_active (stream->pad, TRUE);
1497   gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->pad);
1498 }
1499
1500 static void
1501 gst_ss_demux_stream_free (GstSSDemux * demux, GstSSDemuxStream * stream)
1502 {
1503   if (stream->queue) {
1504     while (!g_queue_is_empty(stream->queue)) {
1505       gst_buffer_unref (g_queue_pop_head (stream->queue));
1506     }
1507     g_queue_free (stream->queue);
1508     stream->queue = NULL;
1509   }
1510
1511   if (stream->pad) {
1512     gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
1513     stream->pad = NULL;
1514   }
1515   if (stream->cond) {
1516     g_cond_free (stream->cond);
1517     stream->cond = NULL;
1518   }
1519   if (stream->lock) {
1520     g_mutex_free (stream->lock);
1521     stream->lock = NULL;
1522   }
1523   if (stream->queue_lock) {
1524     g_mutex_free (stream->queue_lock);
1525     stream->queue_lock = NULL;
1526   }
1527   if (stream->queue_full) {
1528     g_cond_free (stream->queue_full);
1529     stream->queue_full = NULL;
1530   }
1531   if (stream->queue_empty) {
1532     g_cond_free (stream->queue_empty);
1533     stream->queue_empty= NULL;
1534   }
1535   g_free (stream);
1536 }
1537 static gboolean
1538 ssdemux_init (GstPlugin * plugin)
1539 {
1540   if (!gst_element_register (plugin, "ssdemux", GST_RANK_PRIMARY,
1541           GST_TYPE_SS_DEMUX) || FALSE)
1542     return FALSE;
1543   return TRUE;
1544 }
1545
1546 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
1547     GST_VERSION_MINOR,
1548     "ssdemux",
1549     "Smooth streaming demux plugin",
1550     ssdemux_init, VERSION, "LGPL", PACKAGE_NAME, "http://www.samsung.com/")
1551