PROP_0,
PROP_COOKIES,
PROP_ALLOW_AUDIO_ONLY,
- PROP_FRAGMENTS_CACHE,
+ PROP_CACHE_TIME,
+ PROP_LOW_PERCENTAGE,
+ PROP_HIGH_PERCENTAGE,
PROP_BITRATE_SWITCH_TOLERANCE,
PROP_LAST
};
static void
_do_init (GType type)
{
- GST_DEBUG_CATEGORY_INIT (gst_ss_demux_debug, "ssdemux", 0,
- "ssdemux element");
+ GST_DEBUG_CATEGORY_INIT (gst_ss_demux_debug, "ssdemux", 0, "ssdemux element");
}
GST_BOILERPLATE_FULL (GstSSDemux, gst_ss_demux, GstElement,
GST_TYPE_ELEMENT, _do_init);
-#define DEFAULT_FRAGMENTS_CACHE 0
+#define DEFAULT_CACHE_TIME 6*GST_SECOND
#define DEFAULT_BITRATE_SWITCH_TOLERANCE 0.4
+#define DEFAULT_LOW_PERCENTAGE 1
+#define DEFAULT_HIGH_PERCENTAGE 99
struct _GstSSDemuxStream
{
GstBus *bus;
GMutex *lock;
GCond *cond;
+ GMutex *queue_lock;
+ GCond *queue_full;
+ GCond *queue_empty;
guint frag_cnt;
GQueue *queue;
gchar *uri;
GstCaps *caps;
guint64 switch_ts;
guint64 avg_dur;
+ gboolean is_buffering;
+ gint64 percent;
+ gboolean rcvd_percent;
+ guint64 push_block_time;
+ gint64 cached_duration;
+
+ /* for fragment download rate calculation */
+ guint64 download_start_ts;
+ guint64 download_stop_ts;
+ guint64 download_size;
+
};
static void gst_ss_demux_set_property (GObject * object, guint prop_id,
static void gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream);
static gboolean gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream);
static gboolean gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream);
+static void gst_ss_demux_push_loop (GstSSDemuxStream *stream);
+static void gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent);
static void
gst_ss_demux_base_init (gpointer g_class)
{
GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
- gst_element_class_add_pad_template (element_class,
- gst_static_pad_template_get (&ssdemux_videosrc_template));
- gst_element_class_add_pad_template (element_class,
- gst_static_pad_template_get (&ssdemux_audiosrc_template));
- gst_element_class_add_pad_template (element_class,
- gst_static_pad_template_get (&ssdemux_subsrc_template));
- gst_element_class_add_pad_template (element_class,
- gst_static_pad_template_get (&ssdemux_sink_template));
+ gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_videosrc_template));
+ gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_audiosrc_template));
+ gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_subsrc_template));
+ gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_sink_template));
gst_element_class_set_details_simple (element_class,
"SS Demuxer",
"Allow audio only stream download in live case when download rate is less",
TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
- /* no.of fragments to cache */
- g_object_class_install_property (gobject_class, PROP_FRAGMENTS_CACHE,
- g_param_spec_uint ("fragments-cache", "Fragments cache",
- "Number of fragments needed to be cached to start playing",
- 0, G_MAXUINT, DEFAULT_FRAGMENTS_CACHE,
+ g_object_class_install_property (gobject_class, PROP_CACHE_TIME,
+ g_param_spec_uint64 ("max-cache-time", "caching time",
+ "amount of data that can be cached in seconds", 0, G_MAXUINT64,
+ DEFAULT_CACHE_TIME,
+ G_PARAM_READWRITE));
+
+ g_object_class_install_property (gobject_class, PROP_LOW_PERCENTAGE,
+ g_param_spec_int ("low-percent", "Low Percent",
+ "Low threshold to start buffering",
+ 1, 100, DEFAULT_LOW_PERCENTAGE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_HIGH_PERCENTAGE,
+ g_param_spec_int ("high-percent", "High percent",
+ "High threshold to complete buffering",
+ 2, 100, DEFAULT_HIGH_PERCENTAGE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BITRATE_SWITCH_TOLERANCE,
GST_DEBUG_FUNCPTR (gst_ss_demux_sink_event));
gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
- demux->need_cache = TRUE;
- demux->fragments_cache = DEFAULT_FRAGMENTS_CACHE;
- demux->cancelled = FALSE;
+ demux->max_cache_time = DEFAULT_CACHE_TIME;
demux->cookies = NULL;
demux->ss_mode = SS_MODE_NO_SWITCH;
demux->switch_eos = FALSE;
+ demux->allow_audio_only = FALSE;
+ demux->percent = 100;
+ demux->low_percent = DEFAULT_LOW_PERCENTAGE;
+ demux->high_percent = DEFAULT_HIGH_PERCENTAGE;
+ demux->eos = FALSE;
}
static void
for (n = 0; n < SS_STREAM_NUM; n++) {
if (demux->streams[n]) {
+ gst_pad_stop_task ((demux->streams[n])->pad);
+ g_print ("\n\n\nstopped the TASK\n\n\n");
gst_ss_demux_stream_free (demux, demux->streams[n]);
demux->streams[n] = NULL;
}
case PROP_ALLOW_AUDIO_ONLY:
demux->allow_audio_only = g_value_get_boolean (value);
break;
- case PROP_FRAGMENTS_CACHE:
- demux->fragments_cache = g_value_get_uint (value);
+ case PROP_CACHE_TIME:
+ demux->max_cache_time = g_value_get_uint64 (value);
+ break;
+ case PROP_LOW_PERCENTAGE:
+ demux->low_percent = g_value_get_int (value);
+ break;
+ case PROP_HIGH_PERCENTAGE:
+ demux->high_percent = g_value_get_int (value);
break;
case PROP_BITRATE_SWITCH_TOLERANCE:
demux->bitrate_switch_tol = g_value_get_float (value);
case PROP_ALLOW_AUDIO_ONLY:
g_value_set_boolean (value, demux->allow_audio_only);
break;
- case PROP_FRAGMENTS_CACHE:
- g_value_set_uint (value, demux->fragments_cache);
+ case PROP_CACHE_TIME:
+ g_value_set_uint64 (value, demux->max_cache_time);
+ break;
+ case PROP_LOW_PERCENTAGE:
+ g_value_set_int (value, demux->low_percent);
+ break;
+ case PROP_HIGH_PERCENTAGE:
+ g_value_set_int (value, demux->high_percent);
break;
case PROP_BITRATE_SWITCH_TOLERANCE:
g_value_set_float (value, demux->bitrate_switch_tol);
gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
{
GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
- GstQuery *query;
+ GstQuery *query = NULL;
gboolean ret;
gchar *uri;
case GST_EVENT_EOS: {
int i = 0;
if (demux->manifest == NULL) {
- GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
- break;
+ GST_ERROR_OBJECT (demux, "Received EOS without a manifest.");
+ goto error;
}
GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: mainifest file fetched");
g_free (uri);
} else {
GST_ERROR_OBJECT (demux, "failed to query URI from upstream");
- return FALSE;
+ goto error;
}
gst_query_unref (query);
+ query = NULL;
GST_LOG_OBJECT (demux, "data = %p & size = %d", GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest));
if (!gst_ssm_parse_manifest (demux->parser, (char *)GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest))) {
/* In most cases, this will happen if we set a wrong url in the
* source element and we have received the 404 HTML response instead of
* the playlist */
- GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."),
- (NULL));
- return FALSE;
+ GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."), (NULL));
+ goto error;
+ }
+
+ {
+ unsigned char *protection_data = NULL;
+ unsigned int protection_len = 0;
+
+ /* get protection-header from manifest parser */
+ ret = gst_ssm_parse_get_protection_header (demux->parser, &protection_data, &protection_len);
+ if (!ret) {
+ GST_ERROR_OBJECT (demux, "failed to get protection header...");
+ GST_ELEMENT_ERROR (demux, RESOURCE, NO_SPACE_LEFT, ("fragment allocation failed..."), (NULL));
+ goto error;
+ }
+
+ if (protection_data && protection_len) {
+ g_print ("Got the protection header...\n");
+ demux->protection_header = gst_buffer_new ();
+ GST_BUFFER_DATA (demux->protection_header) = GST_BUFFER_MALLOCDATA (demux->protection_header) = protection_data;
+ GST_BUFFER_SIZE (demux->protection_header) = protection_len;
+ }
}
for( i = 0; i < SS_STREAM_NUM; i++) {
// Add pad emission of the stream
gst_ss_demux_stream_init (demux, stream, i);
+
+ if (!gst_pad_is_linked (stream->pad)) {
+ GST_WARNING_OBJECT (demux, "%s - stream pad is not linked...clean up", ssm_parse_get_stream_name(i));
+ gst_ss_demux_stream_free (demux, stream);
+ continue;
+ }
+
+ /* create stream task */
g_static_rec_mutex_init (&stream->stream_lock);
stream->stream_task = gst_task_create ((GstTaskFunction) gst_ss_demux_stream_loop, demux);
+ if (NULL == stream->stream_task) {
+ GST_ERROR_OBJECT (demux, "failed to create stream task...");
+ GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create stream task"), (NULL));
+ goto error;
+ }
gst_task_set_lock (stream->stream_task, &stream->stream_lock);
+
+ /* create stream push loop */
+ if (!gst_pad_start_task (stream->pad, (GstTaskFunction) gst_ss_demux_push_loop, stream)) {
+ GST_ERROR_OBJECT (demux, "failed to create push loop...");
+ GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create push loop"), (NULL));
+ goto error;
+ }
+
demux->streams[i] = stream;
g_print ("Starting stream - %d task loop...\n", i);
gst_task_start (stream->stream_task);
}
gst_event_unref (event);
+ gst_object_unref (demux);
return TRUE;
}
case GST_EVENT_NEWSEGMENT:
/* Swallow newsegments, we'll push our own */
gst_event_unref (event);
+ gst_object_unref (demux);
return TRUE;
default:
break;
}
return gst_pad_event_default (pad, event);
+
+error:
+ // TODO: add closing
+
+ //gst_ss_demux_stop (demux);
+ gst_event_unref (event);
+ gst_object_unref (demux);
+
+ if (query)
+ gst_query_unref (query);
+
+ g_print ("Returning from sink event...\n");
+ return FALSE;
+
}
static gboolean
error:
{
+ GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to download fragment"), (NULL));
gst_ss_demux_stop (demux, stream);
return FALSE;
}
end_of_list:
{
GST_INFO_OBJECT (demux, "Reached end of playlist, sending EOS");
- gst_pad_push_event (stream->pad, gst_event_new_eos ());
+ demux->eos = TRUE;
gst_ss_demux_stop (demux, stream);
return TRUE;
}
}
static void
+gst_ss_demux_push_loop (GstSSDemuxStream *stream)
+{
+ GstBuffer *outbuf = NULL;
+ GstSSDemux *demux = stream->parent;
+ GstFlowReturn fret = GST_FLOW_OK;
+
+ // TODO: need to take care of EOS handling....
+
+ g_mutex_lock (stream->queue_lock);
+
+ if (g_queue_is_empty (stream->queue)) {
+ GST_DEBUG_OBJECT (stream->pad,"queue is empty wait till, some buffers are available...");
+ if (demux->eos) {
+ GST_INFO_OBJECT (stream->pad, "stream EOS, pause the task");
+ gst_pad_push_event (stream->pad, gst_event_new_eos ());
+ gst_pad_pause_task (stream->pad);
+ g_print ("Paused the task");
+ return;
+ }
+ g_cond_wait (stream->queue_empty, stream->queue_lock);
+ }
+
+ outbuf = g_queue_pop_head (stream->queue);
+
+ if (GST_BUFFER_DURATION_IS_VALID (outbuf)) {
+ stream->cached_duration -= GST_BUFFER_DURATION(outbuf);
+ } else {
+ g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
+ GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
+ g_mutex_unlock (stream->queue_lock);
+ return;
+ }
+
+ g_cond_signal (stream->queue_full);
+ //g_print ("[%s] Signalled full condition...\n", ssm_parse_get_stream_name(stream->type));
+ g_mutex_unlock (stream->queue_lock);
+
+ if (!stream->sent_ns) {
+ guint64 duration = GST_CLOCK_TIME_NONE;
+ guint64 start = GST_CLOCK_TIME_NONE;
+ GstEvent *event = NULL;
+
+ duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(demux->parser), GST_SECOND,
+ GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+
+ start = gst_util_uint64_scale (GST_SSM_PARSE_NS_START(demux->parser), GST_SECOND,
+ GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+
+ event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, start, duration, start);
+
+ GST_DEBUG_OBJECT(demux," new_segment start = %"GST_TIME_FORMAT, GST_TIME_ARGS(start));
+
+ if (!gst_pad_push_event (stream->pad, event)) {
+ GST_ERROR_OBJECT (demux, "failed to push newsegment event");
+ return; // No need to close task for this, because sometimes pad can unlined
+ }
+ stream->sent_ns = TRUE;
+ }
+
+ if (stream->type == SS_STREAM_VIDEO && demux->ss_mode == SS_MODE_AONLY) {
+ GST_BUFFER_TIMESTAMP (outbuf) = stream->switch_ts;
+ GST_BUFFER_DURATION (outbuf) = ((float)1/25) * GST_SECOND;
+ stream->switch_ts = GST_BUFFER_TIMESTAMP (outbuf) + GST_BUFFER_DURATION (outbuf);
+ g_print ("Dummy buffers ts : %"GST_TIME_FORMAT" and dur : %"GST_TIME_FORMAT"\n",
+ GST_TIME_ARGS(GST_BUFFER_TIMESTAMP (outbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION (outbuf)));
+ gchar *caps_string = gst_caps_to_string(GST_BUFFER_CAPS(outbuf));
+ g_print ("caps : %s\n", caps_string);
+ g_free(caps_string);
+ caps_string = NULL;
+ }
+
+ /* push data to downstream*/
+ fret = gst_pad_push (stream->pad, outbuf);
+ if (fret != GST_FLOW_OK) {
+ GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
+ goto error;
+ }
+
+ //g_print ("[%s] pushed buffer\n", ssm_parse_get_stream_name(stream->type));
+error:
+ // TODO: need to close task & post error to bus
+ return;
+}
+
+static void
gst_ss_demux_stream_loop (GstSSDemux * demux)
{
GThread *self = NULL;
self = g_thread_self ();
for (stream_type = 0; stream_type < SS_STREAM_NUM; stream_type++) {
- if (demux->streams[stream_type]->stream_task->abidata.ABI.thread == self) {
+ if (demux->streams[stream_type] && demux->streams[stream_type]->stream_task->abidata.ABI.thread == self) {
stream = demux->streams[stream_type];
break;
}
}
- /* download next fragment of stream_type */
- if (!gst_ss_demux_get_next_fragment (demux, stream_type)) {
- GST_ERROR_OBJECT (demux, "failed to get next fragment...");
- goto error;
+ if (stream) {
+ /* download next fragment of stream_type */
+ if (!gst_ss_demux_get_next_fragment (demux, stream_type)) {
+ GST_ERROR_OBJECT (demux, "failed to get next fragment...");
+ goto error;
+ }
}
return;
gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
{
GstStateChangeReturn ret;
+ GTimeVal time = {0, };
g_print ("Going to download fragment : %s\n", uri);
if (!gst_ss_demux_create_download_pipe (demux, stream, uri, start_ts)) {
return FALSE;
}
+ /* download rate calculation : note down start time*/
+ g_get_current_time (&time);
+ stream->download_start_ts = (time.tv_sec * 1000000)+ time.tv_usec;
+
ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
GST_ERROR_OBJECT (demux, "set_state failed...");
gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
{
gchar *name = NULL;
- GstCaps *caps = NULL;
+ gchar *caps_string = NULL;
if (!gst_uri_is_valid (uri))
return FALSE;
stream->pipe = gst_pipeline_new (name);
if (!stream->pipe) {
GST_ERROR_OBJECT (demux, "failed to create pipeline");
+ g_free(name);
+ name = NULL;
return FALSE;
}
- g_free(name);
name = g_strdup_printf("%s-%s", stream->name, "httpsrc");
GST_DEBUG ("Creating source element for the URI:%s", uri);
stream->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, name);
if (!stream->urisrc) {
GST_ERROR_OBJECT (demux, "failed to create urisrc");
+ g_free(name);
return FALSE;
}
- g_free(name);
if (GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser))
g_object_set (G_OBJECT (stream->urisrc), "is-live", TRUE, NULL);
stream->parser = gst_element_factory_make ("piffdemux", name);
if (!stream->parser) {
GST_ERROR_OBJECT (demux, "failed to create piffdemux element");
+ g_free(name);
+ name = NULL;
return FALSE;
}
- caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
- GST_INFO_OBJECT (stream->pad, "prepare caps = %s", gst_caps_to_string(caps));
+ if (stream->caps)
+ gst_caps_unref (stream->caps);
- g_object_set (G_OBJECT (stream->parser), "caps", caps, NULL);
+ stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
+ caps_string = gst_caps_to_string(stream->caps);
+ GST_INFO_OBJECT (stream->pad, "prepare caps = %s", caps_string);
+ g_free(caps_string);
+ caps_string = NULL;
+
+ g_object_set (G_OBJECT (stream->parser), "caps", stream->caps, NULL);
g_object_set (G_OBJECT (stream->parser), "start-ts", start_ts, NULL);
g_object_set (G_OBJECT (stream->parser), "duration", GST_SSM_PARSE_GET_DURATION(demux->parser), NULL);
g_object_set (G_OBJECT (stream->parser), "is-live", GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser), NULL);
g_object_set (G_OBJECT (stream->parser), "lookahead-count", GST_SSM_PARSE_LOOKAHEAD_COUNT(demux->parser), NULL);
+ if (demux->protection_header)
+ g_object_set (G_OBJECT (stream->parser), "protection-header", demux->protection_header, NULL);
g_signal_connect (stream->parser, "live-param", G_CALLBACK (gst_ss_demux_append_live_params), stream);
- g_free(name);
-
name = g_strdup_printf("%s-%s", stream->name, "sink");
stream->sink = gst_element_factory_make ("appsink", name);
if (!stream->sink) {
GST_ERROR_OBJECT (demux, "failed to create appsink element");
+ g_free(name);
+ name = NULL;
return FALSE;
}
g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
g_signal_connect (stream->sink, "new-buffer", G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
- g_free(name);
-
gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
GST_ERROR ("failed to link elements...");
gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
gst_object_unref (stream->bus);
+ g_free(name);
+ name = NULL;
+
return TRUE;
}
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_EOS: {
- guint64 download_rate = -1;
+ GTimeVal time = {0, };
+ gint idx = 0;
+ guint64 total_push_time = 0;
+ guint64 download_rate = 0;
GST_INFO_OBJECT (stream->pad, "received EOS on download pipe..");
// increase the fragment count on EOS
stream->frag_cnt++;
- if (g_strrstr (gst_element_get_name(stream->urisrc), "http")) {
- g_object_get (stream->urisrc, "download-rate", &download_rate, NULL);
- g_print("*********** '%s' download rate = %d bps **************\n", stream->name, download_rate);
- }
+ /* download rate calculation : note down start time*/
+ g_get_current_time (&time);
+ stream->download_stop_ts = (time.tv_sec * 1000000)+ time.tv_usec;
- // TODO: need to remove download_rate> 0 check.. make it generic
- if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode != SS_MODE_AONLY) && (download_rate >= 0)) {
- if (stream->frag_cnt >= demux->fragments_cache) {
+ download_rate = ((stream->download_size * 8 * 1000000) / (stream->download_stop_ts - stream->download_start_ts - stream->push_block_time));
+ g_print("*********** '%s' download rate = %"G_GUINT64_FORMAT" bpssss **************\n", stream->name, download_rate);
+ stream->download_size = 0;
+ stream->download_stop_ts = stream->download_start_ts = 0;
+ stream->push_block_time = 0;
+
+ if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode != SS_MODE_AONLY)) {
+ if (!stream->is_buffering) {
/* for switching, we are considering video download rate only */
demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
}
g_print ("Error from %s\n", gst_element_get_name (GST_MESSAGE_SRC(msg)));
- gst_message_parse_error( msg, &error, &debug );
+ gst_message_parse_error( msg, &error, &debug);
if (error)
GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: error= %s\n", error->message);
GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: debug = %s\n", debug);
/* handling error, when client requests url, which is yet to be prepared by server */
- if ((!strncmp(error->message, "Precondition Failed", strlen("Precondition Failed"))) && (5 == error->code)) {
+ if (GST_IS_URI_HANDLER(GST_MESSAGE_SRC(msg))) {
GstStateChangeReturn ret;
/* wait for 1sec & request the url again */
// TODO: need to make wait time as generic or Adding loop count to request again & again
- GST_INFO_OBJECT (demux, "ERROR : code = %d, msg = %s, NEED to request again", error->code, error->message);
+ if (error)
+ GST_INFO_OBJECT (demux, "ERROR : code = %d, msg = %s, NEED to request again", error->code, error->message);
+
usleep (1000000); // 1 sec
/* put the current pipeline to NULL state */
GST_ERROR_OBJECT (demux, "failed to create download pipeline");
if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
GST_ERROR_OBJECT (demux, "failed to post error");
+ g_free(debug);
+ debug = NULL;
+
return FALSE;
}
}
if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
GST_ERROR_OBJECT (demux, "failed to post error");
gst_ss_demux_stop (demux, stream);
- g_free( debug);
+ g_free(debug);
debug = NULL;
- g_error_free( error);
+ g_error_free(error);
return FALSE;
}
gst_ss_demux_stop (demux, stream);
g_error_free( error);
break;
}
+ case GST_MESSAGE_BUFFERING: {
+ int n =0;
+ int total_cache_perc = 0;
+ int active_stream_cnt = 0;
+ GstSSDemuxStream *cur_stream = NULL;
+ int avg_percent = 0;
+
+ /* update buffer percent */
+ gst_message_parse_buffering (msg, &stream->rcvd_percent);
+ gchar *name = gst_element_get_name (GST_MESSAGE_SRC (msg));
+ GST_LOG_OBJECT (stream->pad, "Internal bus : Buffering from %s = %d\n", name, stream->rcvd_percent);
+ g_free(name);
+ name = NULL;
+ // TODO: need to check for better logic
+ for (n = 0; n < SS_STREAM_NUM; n++) {
+ cur_stream = demux->streams[n];
+ if (cur_stream) {
+ active_stream_cnt++;
+ total_cache_perc += cur_stream->rcvd_percent;
+ }
+ }
+
+ avg_percent = total_cache_perc / active_stream_cnt;
+
+ GST_LOG_OBJECT (demux, "avg buffering completed = %d", avg_percent);
+
+ if (avg_percent > 100)
+ avg_percent = 100;
+
+ // TODO: need to add mutex for protecting percent
+ if (avg_percent != demux->percent) {
+ demux->percent = avg_percent;
+ GST_LOG_OBJECT (demux, "#########Posting %d buffering msg to main bus ###########", demux->percent);
+
+ gst_element_post_message (GST_ELEMENT (demux), gst_message_new_buffering (GST_OBJECT (demux), avg_percent));
+ }
+ }
+ break;
case GST_MESSAGE_WARNING: {
char* debug = NULL;
GError* error = NULL;
}
static void
+gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent)
+{
+ gboolean do_post = FALSE;
+ GstSSDemux *demux = stream->parent;
+
+ if (stream->is_buffering) {
+ do_post = TRUE;
+ if (percent >= demux->high_percent)
+ stream->is_buffering = FALSE;
+ } else {
+ if (percent < demux->low_percent) {
+ stream->is_buffering = TRUE;
+ do_post = TRUE;
+ }
+ }
+
+ if (do_post) {
+ GstMessage *message;
+ GstBufferingMode mode;
+ gint64 buffering_left = -1;
+
+ percent = percent * 100 / demux->high_percent;
+
+ if (percent > 100)
+ percent = 100;
+
+ if (percent != stream->percent) {
+ stream->percent = percent;
+
+ GST_DEBUG_OBJECT (stream->pad, "buffering %d percent", (gint) percent);
+ g_print ("'%s' buffering %d percent done\n", stream->name, (gint) percent);
+
+ /* posting buffering to internal bus, which will take average & post to main bus */
+ message = gst_message_new_buffering (GST_OBJECT_CAST (stream->sink), (gint) percent);
+ gst_element_post_message (GST_ELEMENT_CAST (stream->sink), message);
+ }
+ }
+
+}
+
+static void
gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data)
{
GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
GstSSDemux *demux = stream->parent;
GstBuffer *inbuf = NULL;
GstFlowReturn fret = GST_FLOW_OK;
+ GstBuffer *headbuf = NULL;
+ gint64 diff = 0;
+ gint64 percent = 0;
+ GTimeVal start = {0, };
+ GTimeVal stop = {0, };
+ guint64 push_start_time = 0;
+ guint64 push_end_time =0;
inbuf = gst_app_sink_pull_buffer ((GstAppSink *)appsink);
if (!inbuf) {
return;
}
- GST_LOG_OBJECT (stream->pad, "Inbuf : size = %d, ts = %"GST_TIME_FORMAT", dur = %"GST_TIME_FORMAT,
- GST_BUFFER_SIZE(inbuf), GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION(inbuf)));
-
- // Queue the buffers till fragment cache reached... after reaching start pushing data to respective port
- if ( stream->frag_cnt < demux->fragments_cache ) {
- GST_LOG_OBJECT (demux, "queuing data till caching finished...");
- g_queue_push_tail (stream->queue, inbuf);
- return;
- }
-
- if (!stream->sent_ns) {
- guint64 duration = GST_CLOCK_TIME_NONE;
- guint64 start = GST_CLOCK_TIME_NONE;
- GstEvent *event = NULL;
+ g_mutex_lock (stream->queue_lock);
- duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(demux->parser), GST_SECOND,
- GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+ stream->download_size += GST_BUFFER_SIZE(inbuf);
- start = gst_util_uint64_scale (GST_SSM_PARSE_NS_START(demux->parser), GST_SECOND,
- GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
+ /* download rate calculation : note push_start_ts */
+ g_get_current_time (&start);
+ push_start_time = (start.tv_sec * 1000000)+ start.tv_usec;
- event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME,
- start, duration, start);
+ GST_LOG_OBJECT (stream->pad, "Inbuf : size = %d, ts = %"GST_TIME_FORMAT", dur = %"GST_TIME_FORMAT,
+ GST_BUFFER_SIZE(inbuf), GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION(inbuf)));
- GST_DEBUG_OBJECT(demux," new_segment start = %"GST_TIME_FORMAT, GST_TIME_ARGS(start));
+ g_queue_push_tail (stream->queue, inbuf);
- if (!gst_pad_push_event (stream->pad, event)) {
- GST_ERROR_OBJECT (demux, "failed to push newsegment event");
- return;
- }
- stream->sent_ns = TRUE;
+ if (GST_BUFFER_DURATION_IS_VALID (inbuf)) {
+ stream->cached_duration += GST_BUFFER_DURATION(inbuf);
+ } else {
+ g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
+ GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
+ g_mutex_unlock (stream->queue_lock);
+ return;
}
- while (!g_queue_is_empty(stream->queue)) {
- GstBuffer *cache_buf = g_queue_pop_head (stream->queue);
+ if (stream->cached_duration >= 0) {
+ percent = (stream->cached_duration * 100) / demux->max_cache_time;
+ //g_print ("[%s] percent done = %d[%"G_GINT64_FORMAT"]\n", ssm_parse_get_stream_name(stream->type), percent, percent);
- fret = gst_pad_push (stream->pad, cache_buf);
- if (fret != GST_FLOW_OK) {
- GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
- goto error;
+ // TODO: need to decide, whther to call before wait or after ??
+ gst_ss_demux_update_buffering (stream, percent);
+
+ if (percent > 100) {
+ /* update buffering & wait if space is not available */
+ GST_DEBUG_OBJECT (stream->pad, "Reached more than 100 percent, queue full & wait till free");
+ g_cond_wait(stream->queue_full, stream->queue_lock);
+ GST_DEBUG_OBJECT (stream->pad,"Received signal to add more data...");
}
+ } else {
+ g_print ("cached duration can not be negative\n\n\n");
+ GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid cached duration"), (NULL));
+ g_mutex_unlock (stream->queue_lock);
+ return;
}
- if (stream->type == SS_STREAM_VIDEO && demux->ss_mode == SS_MODE_AONLY) {
- GST_BUFFER_TIMESTAMP (inbuf) = stream->switch_ts;
- GST_BUFFER_DURATION (inbuf) = ((float)1/25) * GST_SECOND;
- stream->switch_ts = GST_BUFFER_TIMESTAMP (inbuf) + GST_BUFFER_DURATION (inbuf);
- g_print ("Dummy buffers ts : %"GST_TIME_FORMAT" and dur : %"GST_TIME_FORMAT"\n",
- GST_TIME_ARGS(GST_BUFFER_TIMESTAMP (inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION (inbuf)));
+ /* download rate calculation : note push_stop_ts */
+ g_get_current_time (&stop);
+ push_end_time = (stop.tv_sec * 1000000)+ stop.tv_usec;
- g_print ("caps : %s\n", gst_caps_to_string(GST_BUFFER_CAPS(inbuf)));
- }
+ stream->push_block_time += push_end_time - push_start_time;
- /* push data to downstream*/
- fret = gst_pad_push (stream->pad, inbuf);
- if (fret != GST_FLOW_OK) {
- GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
- goto error;
- }
+ g_cond_signal (stream->queue_empty);
-error:
+ g_mutex_unlock (stream->queue_lock);
return;
}
stream->cond = g_cond_new ();
stream->lock = g_mutex_new ();
stream->queue = g_queue_new ();
+ stream->queue_full = g_cond_new ();
+ stream->queue_empty = g_cond_new ();
+ stream->queue_lock = g_mutex_new ();
stream->parent = demux;
stream->pipe = NULL;
stream->urisrc = NULL;
stream->sent_ns = FALSE;
stream->switch_ts = GST_CLOCK_TIME_NONE;
stream->avg_dur = GST_CLOCK_TIME_NONE;
+ stream->percent = 100;
+ stream->rcvd_percent = 0;
+ stream->push_block_time = 0;
+ stream->cached_duration = 0;
+ stream->download_start_ts = 0;
+ stream->download_stop_ts = 0;
+ stream->download_size = 0;
if (stream->type == SS_STREAM_VIDEO) {
stream->pad = gst_pad_new_from_static_template (&ssdemux_videosrc_template, "video");
gst_pad_set_query_function (stream->pad, gst_ss_demux_handle_src_query);
stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
- g_print ("prepare video caps = %s", gst_caps_to_string(stream->caps));
+ gchar *caps_name = gst_caps_to_string(stream->caps);
+ g_print ("prepare video caps = %s", caps_name);
+ g_free(caps_name);
GST_DEBUG_OBJECT (demux, "setting caps %" GST_PTR_FORMAT, stream->caps);
gst_pad_set_caps (stream->pad, stream->caps);
g_mutex_free (stream->lock);
stream->lock = NULL;
}
-
+ if (stream->queue_lock) {
+ g_mutex_free (stream->queue_lock);
+ stream->queue_lock = NULL;
+ }
+ if (stream->queue_full) {
+ g_cond_free (stream->queue_full);
+ stream->queue_full = NULL;
+ }
+ if (stream->queue_empty) {
+ g_cond_free (stream->queue_empty);
+ stream->queue_empty= NULL;
+ }
g_free (stream);
}
static gboolean