tizen 2.3.1 release
[framework/multimedia/gst-plugins-ext0.10.git] / ssdemux / src / gstssdemux.c
index 6bec15f..ca7c503 100755 (executable)
@@ -16,7 +16,9 @@ enum
   PROP_0,
   PROP_COOKIES,
   PROP_ALLOW_AUDIO_ONLY,
-  PROP_FRAGMENTS_CACHE,
+  PROP_CACHE_TIME,
+  PROP_LOW_PERCENTAGE,
+  PROP_HIGH_PERCENTAGE,
   PROP_BITRATE_SWITCH_TOLERANCE,
   PROP_LAST
 };
@@ -53,15 +55,16 @@ GST_DEBUG_CATEGORY_STATIC (gst_ss_demux_debug);
 static void
 _do_init (GType type)
 {
-  GST_DEBUG_CATEGORY_INIT (gst_ss_demux_debug, "ssdemux", 0,
-      "ssdemux element");
+  GST_DEBUG_CATEGORY_INIT (gst_ss_demux_debug, "ssdemux", 0, "ssdemux element");
 }
 
 GST_BOILERPLATE_FULL (GstSSDemux, gst_ss_demux, GstElement,
     GST_TYPE_ELEMENT, _do_init);
 
-#define DEFAULT_FRAGMENTS_CACHE 0
+#define DEFAULT_CACHE_TIME 6*GST_SECOND
 #define DEFAULT_BITRATE_SWITCH_TOLERANCE 0.4
+#define DEFAULT_LOW_PERCENTAGE 1
+#define DEFAULT_HIGH_PERCENTAGE 99
 
 struct _GstSSDemuxStream
 {
@@ -79,6 +82,9 @@ struct _GstSSDemuxStream
   GstBus *bus;
   GMutex *lock;
   GCond *cond;
+  GMutex *queue_lock;
+  GCond *queue_full;
+  GCond *queue_empty;
   guint frag_cnt;
   GQueue *queue;
   gchar *uri;
@@ -87,6 +93,17 @@ struct _GstSSDemuxStream
   GstCaps *caps;
   guint64 switch_ts;
   guint64 avg_dur;
+  gboolean is_buffering;
+  gint64 percent;
+  gboolean rcvd_percent;
+  guint64 push_block_time;
+  gint64 cached_duration;
+
+  /* for fragment download rate calculation */
+  guint64 download_start_ts;
+  guint64 download_stop_ts;
+  guint64 download_size;
+
 };
 
 static void gst_ss_demux_set_property (GObject * object, guint prop_id,
@@ -107,20 +124,18 @@ static gboolean gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemu
 static void gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream);
 static gboolean gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream);
 static gboolean gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream);
+static void gst_ss_demux_push_loop (GstSSDemuxStream *stream);
+static void gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent);
 
 static void
 gst_ss_demux_base_init (gpointer g_class)
 {
   GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
 
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&ssdemux_videosrc_template));
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&ssdemux_audiosrc_template));
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&ssdemux_subsrc_template));
-  gst_element_class_add_pad_template (element_class,
-      gst_static_pad_template_get (&ssdemux_sink_template));
+  gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_videosrc_template));
+  gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_audiosrc_template));
+  gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_subsrc_template));
+  gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_sink_template));
 
   gst_element_class_set_details_simple (element_class,
       "SS Demuxer",
@@ -153,11 +168,22 @@ gst_ss_demux_class_init (GstSSDemuxClass * klass)
           "Allow audio only stream download in live case when download rate is less",
           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
-  /* no.of fragments to cache */
-  g_object_class_install_property (gobject_class, PROP_FRAGMENTS_CACHE,
-      g_param_spec_uint ("fragments-cache", "Fragments cache",
-          "Number of fragments needed to be cached to start playing",
-          0, G_MAXUINT, DEFAULT_FRAGMENTS_CACHE,
+  g_object_class_install_property (gobject_class, PROP_CACHE_TIME,
+      g_param_spec_uint64 ("max-cache-time", "caching time",
+          "amount of data that can be cached in seconds", 0, G_MAXUINT64,
+          DEFAULT_CACHE_TIME,
+          G_PARAM_READWRITE));
+
+  g_object_class_install_property (gobject_class, PROP_LOW_PERCENTAGE,
+      g_param_spec_int ("low-percent", "Low Percent",
+          "Low threshold to start buffering",
+          1, 100, DEFAULT_LOW_PERCENTAGE,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_HIGH_PERCENTAGE,
+      g_param_spec_int ("high-percent", "High percent",
+          "High threshold to complete buffering",
+          2, 100, DEFAULT_HIGH_PERCENTAGE,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_BITRATE_SWITCH_TOLERANCE,
@@ -183,12 +209,15 @@ gst_ss_demux_init (GstSSDemux * demux, GstSSDemuxClass * klass)
       GST_DEBUG_FUNCPTR (gst_ss_demux_sink_event));
   gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
 
-  demux->need_cache = TRUE;
-  demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE;
-  demux->cancelled = FALSE;
+  demux->max_cache_time = DEFAULT_CACHE_TIME;
   demux->cookies = NULL;
   demux->ss_mode = SS_MODE_NO_SWITCH;
   demux->switch_eos = FALSE;
+  demux->allow_audio_only = FALSE;
+  demux->percent = 100;
+  demux->low_percent = DEFAULT_LOW_PERCENTAGE;
+  demux->high_percent = DEFAULT_HIGH_PERCENTAGE;
+  demux->eos = FALSE;
 }
 
 static void
@@ -199,6 +228,8 @@ gst_ss_demux_dispose (GObject * obj)
 
   for (n = 0; n < SS_STREAM_NUM; n++) {
     if (demux->streams[n]) {
+      gst_pad_stop_task ((demux->streams[n])->pad);
+      g_print ("\n\n\nstopped the TASK\n\n\n");
       gst_ss_demux_stream_free (demux, demux->streams[n]);
       demux->streams[n] = NULL;
     }
@@ -226,8 +257,14 @@ gst_ss_demux_set_property (GObject * object, guint prop_id,
     case PROP_ALLOW_AUDIO_ONLY:
       demux->allow_audio_only = g_value_get_boolean (value);
       break;
-    case PROP_FRAGMENTS_CACHE:
-      demux->fragments_cache = g_value_get_uint (value);
+    case PROP_CACHE_TIME:
+      demux->max_cache_time = g_value_get_uint64 (value);
+      break;
+    case PROP_LOW_PERCENTAGE:
+      demux->low_percent = g_value_get_int (value);
+      break;
+    case PROP_HIGH_PERCENTAGE:
+      demux->high_percent = g_value_get_int (value);
       break;
     case PROP_BITRATE_SWITCH_TOLERANCE:
       demux->bitrate_switch_tol = g_value_get_float (value);
@@ -251,8 +288,14 @@ gst_ss_demux_get_property (GObject * object, guint prop_id, GValue * value,
     case PROP_ALLOW_AUDIO_ONLY:
       g_value_set_boolean (value, demux->allow_audio_only);
       break;
-    case PROP_FRAGMENTS_CACHE:
-      g_value_set_uint (value, demux->fragments_cache);
+    case PROP_CACHE_TIME:
+      g_value_set_uint64 (value, demux->max_cache_time);
+      break;
+    case PROP_LOW_PERCENTAGE:
+      g_value_set_int (value, demux->low_percent);
+      break;
+    case PROP_HIGH_PERCENTAGE:
+      g_value_set_int (value, demux->high_percent);
       break;
     case PROP_BITRATE_SWITCH_TOLERANCE:
       g_value_set_float (value, demux->bitrate_switch_tol);
@@ -267,7 +310,7 @@ static gboolean
 gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
 {
   GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
-  GstQuery *query;
+  GstQuery *query = NULL;
   gboolean ret;
   gchar *uri;
 
@@ -275,8 +318,8 @@ gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_EOS: {
       int i = 0;
       if (demux->manifest == NULL) {
-        GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
-        break;
+        GST_ERROR_OBJECT (demux, "Received EOS without a manifest.");
+        goto error;
       }
 
       GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: mainifest file fetched");
@@ -289,18 +332,38 @@ gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
         g_free (uri);
       } else {
         GST_ERROR_OBJECT (demux, "failed to query URI from upstream");
-        return FALSE;
+        goto error;
       }
       gst_query_unref (query);
+      query = NULL;
 
       GST_LOG_OBJECT (demux, "data = %p & size = %d", GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest));
       if (!gst_ssm_parse_manifest (demux->parser, (char *)GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest))) {
         /* In most cases, this will happen if we set a wrong url in the
          * source element and we have received the 404 HTML response instead of
          * the playlist */
-        GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."),
-            (NULL));
-        return FALSE;
+        GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."), (NULL));
+        goto error;
+      }
+
+      {
+        unsigned char *protection_data = NULL;
+        unsigned int protection_len = 0;
+
+        /* get protection-header from manifest parser */
+        ret = gst_ssm_parse_get_protection_header (demux->parser, &protection_data, &protection_len);
+        if (!ret) {
+          GST_ERROR_OBJECT (demux, "failed to get protection header...");
+          GST_ELEMENT_ERROR (demux, RESOURCE, NO_SPACE_LEFT, ("fragment allocation failed..."), (NULL));
+          goto error;
+        }
+
+        if (protection_data && protection_len) {
+          g_print ("Got the protection header...\n");
+          demux->protection_header = gst_buffer_new ();
+          GST_BUFFER_DATA (demux->protection_header) = GST_BUFFER_MALLOCDATA (demux->protection_header) = protection_data;
+          GST_BUFFER_SIZE (demux->protection_header) = protection_len;
+        }
       }
 
       for( i = 0; i < SS_STREAM_NUM; i++) {
@@ -309,9 +372,30 @@ gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
 
           // Add pad emission of the stream
           gst_ss_demux_stream_init (demux, stream, i);
+
+          if (!gst_pad_is_linked (stream->pad)) {
+            GST_WARNING_OBJECT (demux, "%s - stream pad is not linked...clean up", ssm_parse_get_stream_name(i));
+            gst_ss_demux_stream_free (demux, stream);
+            continue;
+          }
+
+          /* create stream task */
           g_static_rec_mutex_init (&stream->stream_lock);
           stream->stream_task = gst_task_create ((GstTaskFunction) gst_ss_demux_stream_loop, demux);
+          if (NULL == stream->stream_task) {
+            GST_ERROR_OBJECT (demux, "failed to create stream task...");
+            GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create stream task"), (NULL));
+            goto error;
+          }
           gst_task_set_lock (stream->stream_task, &stream->stream_lock);
+
+          /* create stream push loop */
+          if (!gst_pad_start_task (stream->pad, (GstTaskFunction) gst_ss_demux_push_loop, stream)) {
+            GST_ERROR_OBJECT (demux, "failed to create push loop...");
+            GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create push loop"), (NULL));
+            goto error;
+          }
+
           demux->streams[i] = stream;
           g_print ("Starting stream - %d task loop...\n", i);
           gst_task_start (stream->stream_task);
@@ -319,17 +403,33 @@ gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
       }
 
       gst_event_unref (event);
+      gst_object_unref (demux);
       return TRUE;
     }
     case GST_EVENT_NEWSEGMENT:
       /* Swallow newsegments, we'll push our own */
       gst_event_unref (event);
+      gst_object_unref (demux);
       return TRUE;
     default:
       break;
   }
 
   return gst_pad_event_default (pad, event);
+
+error:
+  // TODO: add closing
+
+  //gst_ss_demux_stop (demux);
+  gst_event_unref (event);
+  gst_object_unref (demux);
+
+  if (query)
+    gst_query_unref (query);
+
+  g_print ("Returning from sink event...\n");
+  return FALSE;
+
 }
 
 static gboolean
@@ -517,19 +617,105 @@ gst_ss_demux_get_next_fragment (GstSSDemux * demux, SS_STREAM_TYPE stream_type)
 
 error:
   {
+    GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to download fragment"), (NULL));
     gst_ss_demux_stop (demux, stream);
     return FALSE;
   }
 end_of_list:
   {
     GST_INFO_OBJECT (demux, "Reached end of playlist, sending EOS");
-    gst_pad_push_event (stream->pad, gst_event_new_eos ());
+    demux->eos = TRUE;
     gst_ss_demux_stop (demux, stream);
     return TRUE;
   }
 }
 
 static void
+gst_ss_demux_push_loop (GstSSDemuxStream *stream)
+{
+  GstBuffer *outbuf = NULL;
+  GstSSDemux *demux = stream->parent;
+  GstFlowReturn fret = GST_FLOW_OK;
+
+  // TODO: need to take care of EOS handling....
+
+  g_mutex_lock (stream->queue_lock);
+
+  if (g_queue_is_empty (stream->queue)) {
+    GST_DEBUG_OBJECT (stream->pad,"queue is empty wait till, some buffers are available...");
+    if (demux->eos) {
+      GST_INFO_OBJECT (stream->pad, "stream EOS, pause the task");
+      gst_pad_push_event (stream->pad, gst_event_new_eos ());
+      gst_pad_pause_task (stream->pad);
+      g_print ("Paused the task");
+      return;
+    }
+    g_cond_wait (stream->queue_empty, stream->queue_lock);
+  }
+
+  outbuf = g_queue_pop_head (stream->queue);
+
+  if (GST_BUFFER_DURATION_IS_VALID (outbuf)) {
+    stream->cached_duration -= GST_BUFFER_DURATION(outbuf);
+  } else {
+    g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
+    GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
+    g_mutex_unlock (stream->queue_lock);
+    return;
+  }
+
+  g_cond_signal (stream->queue_full);
+  //g_print ("[%s] Signalled full condition...\n", ssm_parse_get_stream_name(stream->type));
+  g_mutex_unlock (stream->queue_lock);
+
+  if (!stream->sent_ns) {
+    guint64 duration = GST_CLOCK_TIME_NONE;
+    guint64 start = GST_CLOCK_TIME_NONE;
+    GstEvent *event = NULL;
+
+    duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(demux->parser), GST_SECOND,
+                                      GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+
+    start = gst_util_uint64_scale (GST_SSM_PARSE_NS_START(demux->parser), GST_SECOND,
+                                   GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+
+    event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, start, duration, start);
+
+    GST_DEBUG_OBJECT(demux," new_segment start = %"GST_TIME_FORMAT, GST_TIME_ARGS(start));
+
+    if (!gst_pad_push_event (stream->pad, event)) {
+      GST_ERROR_OBJECT (demux, "failed to push newsegment event");
+      return; // No need to close task for this, because sometimes pad can unlined
+    }
+    stream->sent_ns = TRUE;
+  }
+
+  if (stream->type == SS_STREAM_VIDEO && demux->ss_mode == SS_MODE_AONLY) {
+    GST_BUFFER_TIMESTAMP (outbuf) = stream->switch_ts;
+    GST_BUFFER_DURATION (outbuf) = ((float)1/25) * GST_SECOND;
+    stream->switch_ts = GST_BUFFER_TIMESTAMP (outbuf) + GST_BUFFER_DURATION (outbuf);
+    g_print ("Dummy buffers ts : %"GST_TIME_FORMAT" and dur : %"GST_TIME_FORMAT"\n",
+             GST_TIME_ARGS(GST_BUFFER_TIMESTAMP (outbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION (outbuf)));
+    gchar *caps_string = gst_caps_to_string(GST_BUFFER_CAPS(outbuf));
+    g_print ("caps : %s\n", caps_string);
+    g_free(caps_string);
+    caps_string = NULL;
+  }
+
+  /* push data to downstream*/
+  fret = gst_pad_push (stream->pad, outbuf);
+  if (fret != GST_FLOW_OK) {
+    GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
+    goto error;
+  }
+
+  //g_print ("[%s] pushed buffer\n", ssm_parse_get_stream_name(stream->type));
+error:
+  // TODO: need to close task & post error to bus
+  return;
+}
+
+static void
 gst_ss_demux_stream_loop (GstSSDemux * demux)
 {
   GThread *self = NULL;
@@ -539,16 +725,18 @@ gst_ss_demux_stream_loop (GstSSDemux * demux)
   self = g_thread_self ();
 
   for (stream_type = 0; stream_type < SS_STREAM_NUM; stream_type++) {
-    if (demux->streams[stream_type]->stream_task->abidata.ABI.thread == self) {
+    if (demux->streams[stream_type] && demux->streams[stream_type]->stream_task->abidata.ABI.thread == self) {
       stream = demux->streams[stream_type];
       break;
     }
   }
 
-  /* download next fragment of stream_type */
-  if (!gst_ss_demux_get_next_fragment (demux, stream_type)) {
-    GST_ERROR_OBJECT (demux, "failed to get next fragment...");
-    goto error;
+  if (stream) {
+    /* download next fragment of stream_type */
+    if (!gst_ss_demux_get_next_fragment (demux, stream_type)) {
+      GST_ERROR_OBJECT (demux, "failed to get next fragment...");
+      goto error;
+    }
   }
 
   return;
@@ -567,6 +755,7 @@ static gboolean
 gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
 {
   GstStateChangeReturn ret;
+  GTimeVal time = {0, };
 
   g_print ("Going to download fragment : %s\n", uri);
   if (!gst_ss_demux_create_download_pipe (demux, stream, uri, start_ts)) {
@@ -574,6 +763,10 @@ gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, con
     return FALSE;
   }
 
+  /* download rate calculation : note down start time*/
+  g_get_current_time (&time);
+  stream->download_start_ts = (time.tv_sec * 1000000)+ time.tv_usec;
+
   ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
   if (ret == GST_STATE_CHANGE_FAILURE) {
     GST_ERROR_OBJECT (demux, "set_state failed...");
@@ -711,7 +904,7 @@ static gboolean
 gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
 {
   gchar *name = NULL;
-  GstCaps *caps = NULL;
+  gchar *caps_string = NULL;
 
   if (!gst_uri_is_valid (uri))
     return FALSE;
@@ -721,18 +914,19 @@ gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream,
   stream->pipe = gst_pipeline_new (name);
   if (!stream->pipe) {
     GST_ERROR_OBJECT (demux, "failed to create pipeline");
+    g_free(name);
+    name = NULL;
     return FALSE;
   }
-  g_free(name);
 
   name = g_strdup_printf("%s-%s", stream->name, "httpsrc");
   GST_DEBUG ("Creating source element for the URI:%s", uri);
   stream->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, name);
   if (!stream->urisrc) {
     GST_ERROR_OBJECT (demux, "failed to create urisrc");
+    g_free(name);
     return FALSE;
   }
-  g_free(name);
 
   if (GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser))
     g_object_set (G_OBJECT (stream->urisrc), "is-live", TRUE, NULL);
@@ -743,32 +937,40 @@ gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream,
   stream->parser = gst_element_factory_make ("piffdemux", name);
   if (!stream->parser) {
     GST_ERROR_OBJECT (demux, "failed to create piffdemux element");
+    g_free(name);
+    name = NULL;
     return FALSE;
   }
 
-  caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
-  GST_INFO_OBJECT (stream->pad, "prepare caps = %s", gst_caps_to_string(caps));
+  if (stream->caps)
+    gst_caps_unref (stream->caps);
 
-  g_object_set (G_OBJECT (stream->parser), "caps", caps, NULL);
+  stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
+  caps_string = gst_caps_to_string(stream->caps);
+  GST_INFO_OBJECT (stream->pad, "prepare caps = %s", caps_string);
+  g_free(caps_string);
+  caps_string = NULL;
+
+  g_object_set (G_OBJECT (stream->parser), "caps", stream->caps, NULL);
   g_object_set (G_OBJECT (stream->parser), "start-ts", start_ts, NULL);
   g_object_set (G_OBJECT (stream->parser), "duration", GST_SSM_PARSE_GET_DURATION(demux->parser), NULL);
   g_object_set (G_OBJECT (stream->parser), "is-live", GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser), NULL);
   g_object_set (G_OBJECT (stream->parser), "lookahead-count", GST_SSM_PARSE_LOOKAHEAD_COUNT(demux->parser), NULL);
+  if (demux->protection_header)
+    g_object_set (G_OBJECT (stream->parser), "protection-header", demux->protection_header, NULL);
   g_signal_connect (stream->parser, "live-param",  G_CALLBACK (gst_ss_demux_append_live_params), stream);
 
-  g_free(name);
-
   name = g_strdup_printf("%s-%s", stream->name, "sink");
   stream->sink = gst_element_factory_make ("appsink", name);
   if (!stream->sink) {
     GST_ERROR_OBJECT (demux, "failed to create appsink element");
+    g_free(name);
+    name = NULL;
     return FALSE;
   }
   g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
   g_signal_connect (stream->sink, "new-buffer",  G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
 
-  g_free(name);
-
   gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
   if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
     GST_ERROR ("failed to link elements...");
@@ -779,6 +981,9 @@ gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream,
   gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
   gst_object_unref (stream->bus);
 
+  g_free(name);
+  name = NULL;
+
   return TRUE;
 }
 
@@ -937,20 +1142,27 @@ gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
 
   switch (GST_MESSAGE_TYPE(msg)) {
     case GST_MESSAGE_EOS: {
-      guint64 download_rate = -1;
+      GTimeVal time = {0, };
+      gint idx = 0;
+      guint64 total_push_time = 0;
+      guint64 download_rate = 0;
 
       GST_INFO_OBJECT (stream->pad, "received EOS on download pipe..");
       // increase the fragment count on EOS
       stream->frag_cnt++;
 
-      if (g_strrstr (gst_element_get_name(stream->urisrc), "http")) {
-        g_object_get (stream->urisrc, "download-rate", &download_rate, NULL);
-        g_print("*********** '%s' download rate = %d bps **************\n", stream->name, download_rate);
-      }
+      /* download rate calculation : note down start time*/
+      g_get_current_time (&time);
+      stream->download_stop_ts = (time.tv_sec * 1000000)+ time.tv_usec;
 
-      // TODO: need to remove download_rate> 0 check.. make it generic
-      if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode != SS_MODE_AONLY) && (download_rate >= 0)) {
-        if (stream->frag_cnt >= demux->fragments_cache) {
+      download_rate = ((stream->download_size * 8 * 1000000) / (stream->download_stop_ts - stream->download_start_ts - stream->push_block_time));
+      g_print("*********** '%s' download rate = %"G_GUINT64_FORMAT" bpssss **************\n", stream->name, download_rate);
+      stream->download_size = 0;
+      stream->download_stop_ts = stream->download_start_ts = 0;
+      stream->push_block_time = 0;
+
+      if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode != SS_MODE_AONLY)) {
+        if (!stream->is_buffering) {
           /* for switching, we are considering video download rate only */
           demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
         }
@@ -983,19 +1195,21 @@ gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
 
       g_print ("Error from %s\n", gst_element_get_name (GST_MESSAGE_SRC(msg)));
 
-      gst_message_parse_error( msg, &error, &debug );
+      gst_message_parse_error( msg, &error, &debug);
       if (error)
         GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: error= %s\n", error->message);
 
       GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: debug = %s\n", debug);
 
       /* handling error, when client requests url, which is yet to be prepared by server */
-      if ((!strncmp(error->message, "Precondition Failed", strlen("Precondition Failed"))) && (5 == error->code)) {
+      if (GST_IS_URI_HANDLER(GST_MESSAGE_SRC(msg))) {
         GstStateChangeReturn ret;
 
         /* wait for 1sec & request the url again */
         // TODO: need to make wait time as generic or Adding loop count to request again & again
-        GST_INFO_OBJECT (demux, "ERROR : code = %d, msg = %s, NEED to request again", error->code, error->message);
+        if (error)
+          GST_INFO_OBJECT (demux, "ERROR : code = %d, msg = %s, NEED to request again", error->code, error->message);
+
         usleep (1000000); // 1 sec
 
         /* put the current pipeline to NULL state */
@@ -1008,6 +1222,9 @@ gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
           GST_ERROR_OBJECT (demux, "failed to create download pipeline");
           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
             GST_ERROR_OBJECT (demux, "failed to post error");
+            g_free(debug);
+            debug = NULL;
+
             return FALSE;
           }
         }
@@ -1028,9 +1245,9 @@ gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
           if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
             GST_ERROR_OBJECT (demux, "failed to post error");
             gst_ss_demux_stop (demux, stream);
-            g_free( debug);
+            g_free(debug);
             debug = NULL;
-            g_error_free( error);
+            g_error_free(error);
             return FALSE;
         }
         gst_ss_demux_stop (demux, stream);
@@ -1041,6 +1258,44 @@ gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
       g_error_free( error);
       break;
     }
+    case GST_MESSAGE_BUFFERING: {
+      int n =0;
+      int total_cache_perc = 0;
+      int active_stream_cnt = 0;
+      GstSSDemuxStream *cur_stream = NULL;
+      int avg_percent = 0;
+
+      /* update buffer percent */
+      gst_message_parse_buffering (msg, &stream->rcvd_percent);
+      gchar *name = gst_element_get_name (GST_MESSAGE_SRC (msg));
+      GST_LOG_OBJECT (stream->pad, "Internal bus : Buffering from %s = %d\n", name, stream->rcvd_percent);
+      g_free(name);
+      name = NULL;
+      // TODO: need to check for better logic
+      for (n = 0; n < SS_STREAM_NUM; n++) {
+        cur_stream = demux->streams[n];
+        if (cur_stream) {
+          active_stream_cnt++;
+          total_cache_perc += cur_stream->rcvd_percent;
+        }
+      }
+
+      avg_percent = total_cache_perc / active_stream_cnt;
+
+      GST_LOG_OBJECT (demux, "avg buffering completed = %d", avg_percent);
+
+      if (avg_percent > 100)
+        avg_percent = 100;
+
+      // TODO: need to add mutex for protecting percent
+      if (avg_percent != demux->percent) {
+        demux->percent = avg_percent;
+        GST_LOG_OBJECT (demux, "#########Posting %d buffering msg to main bus ###########", demux->percent);
+
+        gst_element_post_message (GST_ELEMENT (demux), gst_message_new_buffering (GST_OBJECT (demux), avg_percent));
+      }
+    }
+    break;
     case GST_MESSAGE_WARNING: {
       char* debug = NULL;
       GError* error = NULL;
@@ -1061,12 +1316,60 @@ gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
 }
 
 static void
+gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent)
+{
+  gboolean do_post = FALSE;
+  GstSSDemux *demux = stream->parent;
+
+  if (stream->is_buffering) {
+    do_post = TRUE;
+    if (percent >= demux->high_percent)
+      stream->is_buffering = FALSE;
+  } else {
+    if (percent < demux->low_percent) {
+      stream->is_buffering = TRUE;
+      do_post = TRUE;
+    }
+  }
+
+  if (do_post) {
+    GstMessage *message;
+    GstBufferingMode mode;
+    gint64 buffering_left = -1;
+
+    percent = percent * 100 / demux->high_percent;
+
+    if (percent > 100)
+      percent = 100;
+
+    if (percent != stream->percent) {
+      stream->percent = percent;
+
+      GST_DEBUG_OBJECT (stream->pad, "buffering %d percent", (gint) percent);
+      g_print ("'%s' buffering %d percent done\n", stream->name, (gint) percent);
+
+      /* posting buffering to internal bus, which will take average & post to main bus */
+      message = gst_message_new_buffering (GST_OBJECT_CAST (stream->sink), (gint) percent);
+      gst_element_post_message (GST_ELEMENT_CAST (stream->sink), message);
+    }
+  }
+
+}
+
+static void
 gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data)
 {
   GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
   GstSSDemux *demux = stream->parent;
   GstBuffer *inbuf = NULL;
   GstFlowReturn fret = GST_FLOW_OK;
+  GstBuffer *headbuf = NULL;
+  gint64 diff = 0;
+  gint64 percent = 0;
+  GTimeVal start = {0, };
+  GTimeVal stop = {0, };
+  guint64 push_start_time = 0;
+  guint64 push_end_time =0;
 
   inbuf = gst_app_sink_pull_buffer ((GstAppSink *)appsink);
   if (!inbuf) {
@@ -1074,67 +1377,57 @@ gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data)
     return;
   }
 
-  GST_LOG_OBJECT (stream->pad, "Inbuf : size = %d, ts = %"GST_TIME_FORMAT", dur = %"GST_TIME_FORMAT,
-      GST_BUFFER_SIZE(inbuf), GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION(inbuf)));
-
-  // Queue the buffers till fragment cache reached... after reaching start pushing data to respective port
-  if ( stream->frag_cnt < demux->fragments_cache ) {
-    GST_LOG_OBJECT (demux, "queuing data till caching finished...");
-    g_queue_push_tail (stream->queue, inbuf);
-    return;
-  }
-
-  if (!stream->sent_ns) {
-    guint64 duration = GST_CLOCK_TIME_NONE;
-    guint64 start = GST_CLOCK_TIME_NONE;
-    GstEvent *event = NULL;
+  g_mutex_lock (stream->queue_lock);
 
-    duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(demux->parser), GST_SECOND,
-               GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+  stream->download_size += GST_BUFFER_SIZE(inbuf);
 
-    start = gst_util_uint64_scale (GST_SSM_PARSE_NS_START(demux->parser), GST_SECOND,
-               GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+  /* download rate calculation : note push_start_ts */
+  g_get_current_time (&start);
+  push_start_time = (start.tv_sec * 1000000)+ start.tv_usec;
 
-    event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME,
-                                       start, duration, start);
+  GST_LOG_OBJECT (stream->pad, "Inbuf : size = %d, ts = %"GST_TIME_FORMAT", dur = %"GST_TIME_FORMAT,
+      GST_BUFFER_SIZE(inbuf), GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION(inbuf)));
 
-    GST_DEBUG_OBJECT(demux," new_segment start = %"GST_TIME_FORMAT, GST_TIME_ARGS(start));
+  g_queue_push_tail (stream->queue, inbuf);
 
-    if (!gst_pad_push_event (stream->pad, event)) {
-      GST_ERROR_OBJECT (demux, "failed to push newsegment event");
-      return;
-    }
-    stream->sent_ns = TRUE;
+  if (GST_BUFFER_DURATION_IS_VALID (inbuf)) {
+    stream->cached_duration += GST_BUFFER_DURATION(inbuf);
+  } else {
+    g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
+    GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
+    g_mutex_unlock (stream->queue_lock);
+    return;
   }
 
-  while (!g_queue_is_empty(stream->queue)) {
-    GstBuffer *cache_buf = g_queue_pop_head (stream->queue);
+  if (stream->cached_duration >= 0) {
+    percent = (stream->cached_duration * 100) / demux->max_cache_time;
+    //g_print ("[%s] percent done = %d[%"G_GINT64_FORMAT"]\n", ssm_parse_get_stream_name(stream->type), percent, percent);
 
-    fret = gst_pad_push (stream->pad, cache_buf);
-    if (fret != GST_FLOW_OK) {
-      GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
-      goto error;
+    // TODO: need to decide, whther to call before wait or after ??
+    gst_ss_demux_update_buffering (stream, percent);
+
+    if (percent > 100) {
+      /* update buffering & wait if space is not available */
+      GST_DEBUG_OBJECT (stream->pad, "Reached more than 100 percent, queue full & wait till free");
+      g_cond_wait(stream->queue_full, stream->queue_lock);
+      GST_DEBUG_OBJECT (stream->pad,"Received signal to add more data...");
     }
+  } else {
+    g_print ("cached duration can not be negative\n\n\n");
+    GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid cached duration"), (NULL));
+    g_mutex_unlock (stream->queue_lock);
+    return;
   }
 
-  if (stream->type == SS_STREAM_VIDEO && demux->ss_mode == SS_MODE_AONLY) {
-    GST_BUFFER_TIMESTAMP (inbuf) = stream->switch_ts;
-    GST_BUFFER_DURATION (inbuf) = ((float)1/25) * GST_SECOND;
-    stream->switch_ts = GST_BUFFER_TIMESTAMP (inbuf) + GST_BUFFER_DURATION (inbuf);
-    g_print ("Dummy buffers ts : %"GST_TIME_FORMAT" and dur : %"GST_TIME_FORMAT"\n",
-               GST_TIME_ARGS(GST_BUFFER_TIMESTAMP (inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION (inbuf)));
+  /* download rate calculation : note push_stop_ts */
+  g_get_current_time (&stop);
+  push_end_time = (stop.tv_sec * 1000000)+ stop.tv_usec;
 
-    g_print ("caps : %s\n", gst_caps_to_string(GST_BUFFER_CAPS(inbuf)));
-  }
+  stream->push_block_time += push_end_time - push_start_time;
 
-  /* push data to downstream*/
-  fret = gst_pad_push (stream->pad, inbuf);
-  if (fret != GST_FLOW_OK) {
-    GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
-    goto error;
-  }
+  g_cond_signal (stream->queue_empty);
 
-error:
+  g_mutex_unlock (stream->queue_lock);
   return;
 }
 
@@ -1151,6 +1444,9 @@ gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM
   stream->cond = g_cond_new ();
   stream->lock = g_mutex_new ();
   stream->queue = g_queue_new ();
+  stream->queue_full = g_cond_new ();
+  stream->queue_empty = g_cond_new ();
+  stream->queue_lock = g_mutex_new ();
   stream->parent = demux;
   stream->pipe = NULL;
   stream->urisrc = NULL;
@@ -1163,6 +1459,13 @@ gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM
   stream->sent_ns = FALSE;
   stream->switch_ts = GST_CLOCK_TIME_NONE;
   stream->avg_dur = GST_CLOCK_TIME_NONE;
+  stream->percent = 100;
+  stream->rcvd_percent = 0;
+  stream->push_block_time = 0;
+  stream->cached_duration = 0;
+  stream->download_start_ts = 0;
+  stream->download_stop_ts = 0;
+  stream->download_size = 0;
 
   if (stream->type == SS_STREAM_VIDEO) {
     stream->pad = gst_pad_new_from_static_template (&ssdemux_videosrc_template, "video");
@@ -1182,7 +1485,9 @@ gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM
   gst_pad_set_query_function (stream->pad, gst_ss_demux_handle_src_query);
 
   stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
-  g_print ("prepare video caps = %s", gst_caps_to_string(stream->caps));
+  gchar *caps_name = gst_caps_to_string(stream->caps);
+  g_print ("prepare video caps = %s", caps_name);
+  g_free(caps_name);
 
   GST_DEBUG_OBJECT (demux, "setting caps %" GST_PTR_FORMAT, stream->caps);
   gst_pad_set_caps (stream->pad, stream->caps);
@@ -1215,7 +1520,18 @@ gst_ss_demux_stream_free (GstSSDemux * demux, GstSSDemuxStream * stream)
     g_mutex_free (stream->lock);
     stream->lock = NULL;
   }
-
+  if (stream->queue_lock) {
+    g_mutex_free (stream->queue_lock);
+    stream->queue_lock = NULL;
+  }
+  if (stream->queue_full) {
+    g_cond_free (stream->queue_full);
+    stream->queue_full = NULL;
+  }
+  if (stream->queue_empty) {
+    g_cond_free (stream->queue_empty);
+    stream->queue_empty= NULL;
+  }
   g_free (stream);
 }
 static gboolean