6 /* FIXME 0.11: suppress warnings for deprecated API such as GStaticRecMutex
7 * with newer GLib versions (>= 2.31.0) */
8 #define GLIB_DISABLE_DEPRECATION_WARNINGS
11 //#include <gst/glib-compat-private.h>
12 #include "gstssdemux.h"
18 PROP_ALLOW_AUDIO_ONLY,
22 PROP_BITRATE_SWITCH_TOLERANCE,
26 static GstStaticPadTemplate ssdemux_videosrc_template =
27 GST_STATIC_PAD_TEMPLATE ("video",
32 static GstStaticPadTemplate ssdemux_audiosrc_template =
33 GST_STATIC_PAD_TEMPLATE ("audio",
38 static GstStaticPadTemplate ssdemux_subsrc_template =
39 GST_STATIC_PAD_TEMPLATE ("subtitle",
44 static GstStaticPadTemplate ssdemux_sink_template =
45 GST_STATIC_PAD_TEMPLATE ("sink",
48 GST_STATIC_CAPS ("application/x-ss")); // Need to decide source mimetype
50 GST_DEBUG_CATEGORY_STATIC (gst_ss_demux_debug);
51 #define GST_CAT_DEFAULT gst_ss_demux_debug
53 #undef SIMULATE_AUDIO_ONLY /* enable to simulate audio only case forcibly */
58 GST_DEBUG_CATEGORY_INIT (gst_ss_demux_debug, "ssdemux", 0, "ssdemux element");
61 GST_BOILERPLATE_FULL (GstSSDemux, gst_ss_demux, GstElement,
62 GST_TYPE_ELEMENT, _do_init);
64 #define DEFAULT_CACHE_TIME 6*GST_SECOND
65 #define DEFAULT_BITRATE_SWITCH_TOLERANCE 0.4
66 #define DEFAULT_LOW_PERCENTAGE 1
67 #define DEFAULT_HIGH_PERCENTAGE 99
69 struct _GstSSDemuxStream
77 GStaticRecMutex stream_lock;
96 gboolean is_buffering;
98 gboolean rcvd_percent;
99 guint64 push_block_time;
100 gint64 cached_duration;
102 /* for fragment download rate calculation */
103 guint64 download_start_ts;
104 guint64 download_stop_ts;
105 guint64 download_size;
109 static void gst_ss_demux_set_property (GObject * object, guint prop_id,
110 const GValue * value, GParamSpec * pspec);
111 static void gst_ss_demux_get_property (GObject * object, guint prop_id,
112 GValue * value, GParamSpec * pspec);
113 static gboolean gst_ss_demux_sink_event (GstPad * pad, GstEvent * event);
114 static GstStateChangeReturn gst_ss_demux_change_state (GstElement * element, GstStateChange transition);
115 static void gst_ss_demux_dispose (GObject * obj);
116 static GstFlowReturn gst_ss_demux_chain (GstPad * pad, GstBuffer * buf);
117 static void gst_ss_demux_stream_loop (GstSSDemux * demux);
118 static gboolean gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data);
119 static void gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM_TYPE stream_type);
120 static void gst_ss_demux_stream_free (GstSSDemux * demux, GstSSDemuxStream * stream);
121 static void gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data);
122 static gboolean gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts);
123 static gboolean gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts);
124 static void gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream);
125 static gboolean gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream);
126 static gboolean gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream);
127 static void gst_ss_demux_push_loop (GstSSDemuxStream *stream);
128 static void gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent);
131 gst_ss_demux_base_init (gpointer g_class)
133 GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);
135 gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_videosrc_template));
136 gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_audiosrc_template));
137 gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_subsrc_template));
138 gst_element_class_add_pad_template (element_class, gst_static_pad_template_get (&ssdemux_sink_template));
140 gst_element_class_set_details_simple (element_class,
143 "Smooth Streaming demuxer",
144 "Naveen Cherukuri<naveen.ch@samsung.com>");
148 gst_ss_demux_class_init (GstSSDemuxClass * klass)
150 GObjectClass *gobject_class;
151 GstElementClass *gstelement_class;
153 gobject_class = (GObjectClass *) klass;
154 gstelement_class = (GstElementClass *) klass;
156 gobject_class->set_property = gst_ss_demux_set_property;
157 gobject_class->get_property = gst_ss_demux_get_property;
158 gobject_class->dispose = gst_ss_demux_dispose;
160 /* to share cookies with other sessions */
161 g_object_class_install_property (gobject_class, PROP_COOKIES,
162 g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
163 G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
165 /* will be considered only in LIVE case */
166 g_object_class_install_property (gobject_class, PROP_ALLOW_AUDIO_ONLY,
167 g_param_spec_boolean ("allow-audio-only", "Allow audio only when downloadrate is less in live case",
168 "Allow audio only stream download in live case when download rate is less",
169 TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171 g_object_class_install_property (gobject_class, PROP_CACHE_TIME,
172 g_param_spec_uint64 ("max-cache-time", "caching time",
173 "amount of data that can be cached in seconds", 0, G_MAXUINT64,
177 g_object_class_install_property (gobject_class, PROP_LOW_PERCENTAGE,
178 g_param_spec_int ("low-percent", "Low Percent",
179 "Low threshold to start buffering",
180 1, 100, DEFAULT_LOW_PERCENTAGE,
181 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183 g_object_class_install_property (gobject_class, PROP_HIGH_PERCENTAGE,
184 g_param_spec_int ("high-percent", "High percent",
185 "High threshold to complete buffering",
186 2, 100, DEFAULT_HIGH_PERCENTAGE,
187 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
189 g_object_class_install_property (gobject_class, PROP_BITRATE_SWITCH_TOLERANCE,
190 g_param_spec_float ("bitrate-switch-tolerance",
191 "Bitrate switch tolerance",
192 "Tolerance with respect of the fragment duration to switch to "
193 "a different bitrate if the client is too slow/fast.",
194 0, 1, DEFAULT_BITRATE_SWITCH_TOLERANCE,
195 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
197 gstelement_class->change_state =
198 GST_DEBUG_FUNCPTR (gst_ss_demux_change_state);
202 gst_ss_demux_init (GstSSDemux * demux, GstSSDemuxClass * klass)
205 demux->sinkpad = gst_pad_new_from_static_template (&ssdemux_sink_template, "sink");
206 gst_pad_set_chain_function (demux->sinkpad,
207 GST_DEBUG_FUNCPTR (gst_ss_demux_chain));
208 gst_pad_set_event_function (demux->sinkpad,
209 GST_DEBUG_FUNCPTR (gst_ss_demux_sink_event));
210 gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
212 demux->max_cache_time = DEFAULT_CACHE_TIME;
213 demux->cookies = NULL;
214 demux->ss_mode = SS_MODE_NO_SWITCH;
215 demux->switch_eos = FALSE;
216 demux->allow_audio_only = FALSE;
217 demux->percent = 100;
218 demux->low_percent = DEFAULT_LOW_PERCENTAGE;
219 demux->high_percent = DEFAULT_HIGH_PERCENTAGE;
224 gst_ss_demux_dispose (GObject * obj)
226 GstSSDemux *demux = GST_SS_DEMUX (obj);
229 for (n = 0; n < SS_STREAM_NUM; n++) {
230 if (demux->streams[n]) {
231 gst_pad_stop_task ((demux->streams[n])->pad);
232 g_print ("\n\n\nstopped the TASK\n\n\n");
233 gst_ss_demux_stream_free (demux, demux->streams[n]);
234 demux->streams[n] = NULL;
239 gst_ssm_parse_free (demux->parser);
240 demux->parser = NULL;
243 G_OBJECT_CLASS (parent_class)->dispose (obj);
247 gst_ss_demux_set_property (GObject * object, guint prop_id,
248 const GValue * value, GParamSpec * pspec)
250 GstSSDemux *demux = GST_SS_DEMUX (object);
254 g_strfreev (demux->cookies);
255 demux->cookies = g_strdupv (g_value_get_boxed (value));
257 case PROP_ALLOW_AUDIO_ONLY:
258 demux->allow_audio_only = g_value_get_boolean (value);
260 case PROP_CACHE_TIME:
261 demux->max_cache_time = g_value_get_uint64 (value);
263 case PROP_LOW_PERCENTAGE:
264 demux->low_percent = g_value_get_int (value);
266 case PROP_HIGH_PERCENTAGE:
267 demux->high_percent = g_value_get_int (value);
269 case PROP_BITRATE_SWITCH_TOLERANCE:
270 demux->bitrate_switch_tol = g_value_get_float (value);
273 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
279 gst_ss_demux_get_property (GObject * object, guint prop_id, GValue * value,
282 GstSSDemux *demux = GST_SS_DEMUX (object);
286 g_value_set_boxed (value, g_strdupv (demux->cookies));
288 case PROP_ALLOW_AUDIO_ONLY:
289 g_value_set_boolean (value, demux->allow_audio_only);
291 case PROP_CACHE_TIME:
292 g_value_set_uint64 (value, demux->max_cache_time);
294 case PROP_LOW_PERCENTAGE:
295 g_value_set_int (value, demux->low_percent);
297 case PROP_HIGH_PERCENTAGE:
298 g_value_set_int (value, demux->high_percent);
300 case PROP_BITRATE_SWITCH_TOLERANCE:
301 g_value_set_float (value, demux->bitrate_switch_tol);
304 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
310 gst_ss_demux_sink_event (GstPad * pad, GstEvent * event)
312 GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
313 GstQuery *query = NULL;
317 switch (event->type) {
318 case GST_EVENT_EOS: {
320 if (demux->manifest == NULL) {
321 GST_ERROR_OBJECT (demux, "Received EOS without a manifest.");
325 GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: mainifest file fetched");
327 query = gst_query_new_uri ();
328 ret = gst_pad_peer_query (demux->sinkpad, query);
330 gst_query_parse_uri (query, &uri);
331 demux->parser = gst_ssm_parse_new (uri);
334 GST_ERROR_OBJECT (demux, "failed to query URI from upstream");
337 gst_query_unref (query);
340 GST_LOG_OBJECT (demux, "data = %p & size = %d", GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest));
341 if (!gst_ssm_parse_manifest (demux->parser, (char *)GST_BUFFER_DATA(demux->manifest), GST_BUFFER_SIZE(demux->manifest))) {
342 /* In most cases, this will happen if we set a wrong url in the
343 * source element and we have received the 404 HTML response instead of
345 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid playlist."), (NULL));
350 unsigned char *protection_data = NULL;
351 unsigned int protection_len = 0;
353 /* get protection-header from manifest parser */
354 ret = gst_ssm_parse_get_protection_header (demux->parser, &protection_data, &protection_len);
356 GST_ERROR_OBJECT (demux, "failed to get protection header...");
357 GST_ELEMENT_ERROR (demux, RESOURCE, NO_SPACE_LEFT, ("fragment allocation failed..."), (NULL));
361 if (protection_data && protection_len) {
362 g_print ("Got the protection header...\n");
363 demux->protection_header = gst_buffer_new ();
364 GST_BUFFER_DATA (demux->protection_header) = GST_BUFFER_MALLOCDATA (demux->protection_header) = protection_data;
365 GST_BUFFER_SIZE (demux->protection_header) = protection_len;
369 for( i = 0; i < SS_STREAM_NUM; i++) {
370 if (gst_ssm_parse_check_stream (demux->parser, i)) {
371 GstSSDemuxStream *stream = g_new0 (GstSSDemuxStream, 1);
373 // Add pad emission of the stream
374 gst_ss_demux_stream_init (demux, stream, i);
376 if (!gst_pad_is_linked (stream->pad)) {
377 GST_WARNING_OBJECT (demux, "%s - stream pad is not linked...clean up", ssm_parse_get_stream_name(i));
378 gst_ss_demux_stream_free (demux, stream);
382 /* create stream task */
383 g_static_rec_mutex_init (&stream->stream_lock);
384 stream->stream_task = gst_task_create ((GstTaskFunction) gst_ss_demux_stream_loop, demux);
385 if (NULL == stream->stream_task) {
386 GST_ERROR_OBJECT (demux, "failed to create stream task...");
387 GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create stream task"), (NULL));
390 gst_task_set_lock (stream->stream_task, &stream->stream_lock);
392 /* create stream push loop */
393 if (!gst_pad_start_task (stream->pad, (GstTaskFunction) gst_ss_demux_push_loop, stream)) {
394 GST_ERROR_OBJECT (demux, "failed to create push loop...");
395 GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to create push loop"), (NULL));
399 demux->streams[i] = stream;
400 g_print ("Starting stream - %d task loop...\n", i);
401 gst_task_start (stream->stream_task);
405 gst_event_unref (event);
406 gst_object_unref (demux);
409 case GST_EVENT_NEWSEGMENT:
410 /* Swallow newsegments, we'll push our own */
411 gst_event_unref (event);
412 gst_object_unref (demux);
418 return gst_pad_event_default (pad, event);
423 //gst_ss_demux_stop (demux);
424 gst_event_unref (event);
425 gst_object_unref (demux);
428 gst_query_unref (query);
430 g_print ("Returning from sink event...\n");
436 gst_ss_demux_handle_src_query (GstPad * pad, GstQuery * query)
438 gboolean res = FALSE;
439 GstSSDemux *ssdemux = GST_SS_DEMUX (gst_pad_get_parent (pad));
441 GST_LOG_OBJECT (pad, "%s query", GST_QUERY_TYPE_NAME (query));
443 // TODO: need to add other query types as well
445 switch (GST_QUERY_TYPE (query)) {
446 case GST_QUERY_DURATION:{
449 gst_query_parse_duration (query, &fmt, NULL);
450 if (fmt == GST_FORMAT_TIME) {
451 gint64 duration = -1;
453 duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(ssdemux->parser), GST_SECOND,
454 GST_SSM_PARSE_GET_TIMESCALE(ssdemux->parser));
456 gst_query_set_duration (query, GST_FORMAT_TIME, duration);
463 res = gst_pad_query_default (pad, query);
467 gst_object_unref (ssdemux);
474 gst_ss_demux_handle_src_event (GstPad * pad, GstEvent * event)
476 GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
478 switch (event->type) {
484 GstSeekType start_type, stop_type;
487 GstSSDemuxStream *stream = NULL;
489 GST_INFO_OBJECT (demux, "Received GST_EVENT_SEEK");
491 // TODO: should be able to seek in DVR window
492 if (GST_SSM_PARSE_IS_LIVE_PRESENTATION (demux->parser)) {
493 GST_WARNING_OBJECT (demux, "Received seek event for live stream");
497 gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
500 if (format != GST_FORMAT_TIME) {
501 GST_WARNING_OBJECT (demux, "Only time format is supported in seek");
505 GST_DEBUG_OBJECT (demux, "seek event, rate: %f start: %" GST_TIME_FORMAT
506 " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
507 GST_TIME_ARGS (stop));
510 for( i = 0; i < SS_STREAM_NUM; i++) {
511 if (stream = demux->streams[i]) {
512 g_cond_signal (stream->cond);
513 gst_task_stop (stream->stream_task);
517 if (flags & GST_SEEK_FLAG_FLUSH) {
518 GST_INFO_OBJECT (demux, "sending flush start");
520 for( i = 0; i < SS_STREAM_NUM; i++) {
521 if (stream = demux->streams[i]) {
522 gst_pad_push_event (stream->pad, gst_event_new_flush_start ());
527 gst_ssm_parse_seek_manifest (demux->parser, start);
529 if (flags & GST_SEEK_FLAG_FLUSH) {
530 GST_INFO_OBJECT (demux, "sending flush stop");
531 for( i = 0; i < SS_STREAM_NUM; i++) {
532 if (stream = demux->streams[i]) {
533 gst_pad_push_event (stream->pad, gst_event_new_flush_stop ());
534 GST_LOG_OBJECT (stream->pad, "Starting pad TASK again...\n");
535 stream->sent_ns = FALSE;
536 stream->frag_cnt = 0; /*resetting to start buffering on SEEK */
537 gst_task_start (stream->stream_task);
548 return gst_pad_event_default (pad, event);
551 static GstStateChangeReturn
552 gst_ss_demux_change_state (GstElement * element, GstStateChange transition)
554 GstStateChangeReturn ret;
556 switch (transition) {
557 case GST_STATE_CHANGE_READY_TO_PAUSED:
559 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
565 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
567 switch (transition) {
568 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
570 case GST_STATE_CHANGE_PAUSED_TO_READY:
580 gst_ss_demux_chain (GstPad * pad, GstBuffer * buf)
582 GstSSDemux *demux = GST_SS_DEMUX (gst_pad_get_parent (pad));
584 if (demux->manifest == NULL)
585 demux->manifest = buf;
587 demux->manifest = gst_buffer_join (demux->manifest, buf);
588 gst_object_unref (demux);
595 gst_ss_demux_get_next_fragment (GstSSDemux * demux, SS_STREAM_TYPE stream_type)
597 GstSSDemuxStream *stream = demux->streams[stream_type];
598 gchar *next_fragment_uri = NULL;
599 guint64 start_ts = 0;
601 if (!gst_ssm_parse_get_next_fragment_url (demux->parser, stream_type, &next_fragment_uri, &start_ts )) {
602 GST_INFO_OBJECT (demux, "This Manifest does not contain more fragments");
606 GST_ERROR_OBJECT (demux, "Fetching next fragment %s", next_fragment_uri);
608 stream->uri = g_strdup(next_fragment_uri);
609 stream->start_ts = start_ts;
611 if (!gst_ss_demux_download_fragment (demux, stream, next_fragment_uri, start_ts)) {
612 GST_ERROR_OBJECT (demux, "failed to download fragment...");
620 GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("failed to download fragment"), (NULL));
621 gst_ss_demux_stop (demux, stream);
626 GST_INFO_OBJECT (demux, "Reached end of playlist, sending EOS");
628 gst_ss_demux_stop (demux, stream);
634 gst_ss_demux_push_loop (GstSSDemuxStream *stream)
636 GstBuffer *outbuf = NULL;
637 GstSSDemux *demux = stream->parent;
638 GstFlowReturn fret = GST_FLOW_OK;
640 // TODO: need to take care of EOS handling....
642 g_mutex_lock (stream->queue_lock);
644 if (g_queue_is_empty (stream->queue)) {
645 GST_DEBUG_OBJECT (stream->pad,"queue is empty wait till, some buffers are available...");
647 GST_INFO_OBJECT (stream->pad, "stream EOS, pause the task");
648 gst_pad_push_event (stream->pad, gst_event_new_eos ());
649 gst_pad_pause_task (stream->pad);
650 g_print ("Paused the task");
653 g_cond_wait (stream->queue_empty, stream->queue_lock);
656 outbuf = g_queue_pop_head (stream->queue);
658 if (GST_BUFFER_DURATION_IS_VALID (outbuf)) {
659 stream->cached_duration -= GST_BUFFER_DURATION(outbuf);
661 g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
662 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
663 g_mutex_unlock (stream->queue_lock);
667 g_cond_signal (stream->queue_full);
668 //g_print ("[%s] Signalled full condition...\n", ssm_parse_get_stream_name(stream->type));
669 g_mutex_unlock (stream->queue_lock);
671 if (!stream->sent_ns) {
672 guint64 duration = GST_CLOCK_TIME_NONE;
673 guint64 start = GST_CLOCK_TIME_NONE;
674 GstEvent *event = NULL;
676 duration = gst_util_uint64_scale (GST_SSM_PARSE_GET_DURATION(demux->parser), GST_SECOND,
677 GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
679 start = gst_util_uint64_scale (GST_SSM_PARSE_NS_START(demux->parser), GST_SECOND,
680 GST_SSM_PARSE_GET_TIMESCALE(demux->parser));
682 event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, start, duration, start);
684 GST_DEBUG_OBJECT(demux," new_segment start = %"GST_TIME_FORMAT, GST_TIME_ARGS(start));
686 if (!gst_pad_push_event (stream->pad, event)) {
687 GST_ERROR_OBJECT (demux, "failed to push newsegment event");
688 return; // No need to close task for this, because sometimes pad can unlined
690 stream->sent_ns = TRUE;
693 if (stream->type == SS_STREAM_VIDEO && demux->ss_mode == SS_MODE_AONLY) {
694 GST_BUFFER_TIMESTAMP (outbuf) = stream->switch_ts;
695 GST_BUFFER_DURATION (outbuf) = ((float)1/25) * GST_SECOND;
696 stream->switch_ts = GST_BUFFER_TIMESTAMP (outbuf) + GST_BUFFER_DURATION (outbuf);
697 g_print ("Dummy buffers ts : %"GST_TIME_FORMAT" and dur : %"GST_TIME_FORMAT"\n",
698 GST_TIME_ARGS(GST_BUFFER_TIMESTAMP (outbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION (outbuf)));
699 gchar *caps_string = gst_caps_to_string(GST_BUFFER_CAPS(outbuf));
700 g_print ("caps : %s\n", caps_string);
705 /* push data to downstream*/
706 fret = gst_pad_push (stream->pad, outbuf);
707 if (fret != GST_FLOW_OK) {
708 GST_ERROR_OBJECT (demux, "failed to push data, reason : %s", gst_flow_get_name (fret));
712 //g_print ("[%s] pushed buffer\n", ssm_parse_get_stream_name(stream->type));
714 // TODO: need to close task & post error to bus
719 gst_ss_demux_stream_loop (GstSSDemux * demux)
721 GThread *self = NULL;
723 GstSSDemuxStream *stream = NULL;
725 self = g_thread_self ();
727 for (stream_type = 0; stream_type < SS_STREAM_NUM; stream_type++) {
728 if (demux->streams[stream_type] && demux->streams[stream_type]->stream_task->abidata.ABI.thread == self) {
729 stream = demux->streams[stream_type];
735 /* download next fragment of stream_type */
736 if (!gst_ss_demux_get_next_fragment (demux, stream_type)) {
737 GST_ERROR_OBJECT (demux, "failed to get next fragment...");
746 gst_task_pause (stream->stream_task);
747 GST_ELEMENT_ERROR (demux, RESOURCE, NOT_FOUND,
748 ("could not download fragments"), (NULL));
749 gst_ss_demux_stop (demux, stream);
755 gst_ss_demux_download_fragment (GstSSDemux *demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
757 GstStateChangeReturn ret;
758 GTimeVal time = {0, };
760 g_print ("Going to download fragment : %s\n", uri);
761 if (!gst_ss_demux_create_download_pipe (demux, stream, uri, start_ts)) {
762 GST_ERROR_OBJECT (demux, "failed to create download pipeline");
766 /* download rate calculation : note down start time*/
767 g_get_current_time (&time);
768 stream->download_start_ts = (time.tv_sec * 1000000)+ time.tv_usec;
770 ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
771 if (ret == GST_STATE_CHANGE_FAILURE) {
772 GST_ERROR_OBJECT (demux, "set_state failed...");
776 if (stream->pipe && demux->ss_mode == SS_MODE_AONLY &&
777 stream->type == SS_STREAM_VIDEO) {
779 GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI");
780 g_mutex_lock (stream->lock);
781 g_cond_wait (stream->cond, stream->lock);
782 GST_INFO_OBJECT (stream->pad, "Recived signal to shutdown...");
783 g_mutex_unlock (stream->lock);
785 /* put live pipeline to PAUSED state to unlink urisrc & piffdemux */
786 gst_element_set_state (stream->pipe, GST_STATE_NULL);
787 gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
791 stream->switch_ts = stream->start_ts;
793 /* create dummy frame sender */
794 if (!gst_ss_demux_create_dummy_sender (demux, stream)) {
795 GST_ERROR_OBJECT (demux, "failed to create dummy sender pipeline...");
796 GST_ELEMENT_ERROR (demux, RESOURCE, FAILED, ("Unable to create dummy pipe."), (NULL));
802 * - the download succeed (EOS)
803 * - the download failed (Error message on the fetcher bus)
804 * - the download was canceled
806 GST_DEBUG_OBJECT (demux, "Waiting to fetch the URI");
807 g_mutex_lock (stream->lock);
808 g_cond_wait (stream->cond, stream->lock);
809 GST_INFO_OBJECT (stream->pad, "Recived signal to shutdown...");
810 g_mutex_unlock (stream->lock);
812 gst_element_set_state (stream->pipe, GST_STATE_NULL);
813 gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
820 gst_ss_demux_create_dummy_sender(GstSSDemux *demux, GstSSDemuxStream *stream)
822 GstStateChangeReturn ret;
824 if (!gst_ss_demux_create_dummy_pipe (demux, stream)) {
825 GST_ERROR_OBJECT (demux, "failed to create download pipeline");
829 ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
830 if (ret == GST_STATE_CHANGE_FAILURE) {
831 GST_ERROR_OBJECT (demux, "set_state failed...");
836 GST_DEBUG_OBJECT (demux, "Waiting to download next video URI");
837 g_mutex_lock (stream->lock);
838 g_cond_wait (stream->cond, stream->lock);
840 gst_element_set_state (stream->pipe, GST_STATE_NULL);
841 gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
844 g_mutex_unlock (stream->lock);
851 gst_ss_demux_append_live_params(GstElement *piffparser, piff_live_param_t *param, gpointer data)
853 GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
854 GstSSDemux *demux = stream->parent;
856 guint64 timestamp = 0;
857 guint64 duration = 0;
859 GST_LOG_OBJECT (demux, "received signal structs count = %d\n", param->count);
861 for (i = 0 ; i< param->count; i++) {
862 if (param->long_info) {
863 piff_fragment_longtime_info *info = &(param->long_info[i]);
864 timestamp = info->ts;
865 duration = info->duration;
866 } else if (param->info) {
867 piff_fragment_time_info *info = &(param->info[i]);
868 timestamp = info->ts;
869 duration = info->duration;
872 GST_LOG_OBJECT (demux, "Received ts = %llu and dur = %llu\n", timestamp, duration);
874 if (!gst_ssm_parse_append_next_fragment (demux->parser, stream->type, timestamp, duration)) {
875 GST_ERROR_OBJECT (demux, "failed to append new fragment");
876 GST_ELEMENT_ERROR (demux, RESOURCE, NO_SPACE_LEFT, ("fragment allocation failed..."), (NULL));
881 if (param->long_info) {
882 free (param->long_info);
883 param->long_info = NULL;
893 if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode == SS_MODE_AONLY)) {
894 g_print ("\n\n\t\tSignalling download pipe shutdonw....\n\n");
896 g_object_get (stream->parser, "frame-dur", &stream->avg_dur, NULL);
897 g_print ("frame duration = %"GST_TIME_FORMAT"\n\n\n", GST_TIME_ARGS(stream->avg_dur));
898 g_cond_signal (stream->cond);
904 gst_ss_demux_create_download_pipe (GstSSDemux * demux, GstSSDemuxStream *stream, const gchar * uri, guint64 start_ts)
907 gchar *caps_string = NULL;
909 if (!gst_uri_is_valid (uri))
912 name = g_strdup_printf("%s-%s", stream->name, "downloader");
914 stream->pipe = gst_pipeline_new (name);
916 GST_ERROR_OBJECT (demux, "failed to create pipeline");
922 name = g_strdup_printf("%s-%s", stream->name, "httpsrc");
923 GST_DEBUG ("Creating source element for the URI:%s", uri);
924 stream->urisrc = gst_element_make_from_uri (GST_URI_SRC, uri, name);
925 if (!stream->urisrc) {
926 GST_ERROR_OBJECT (demux, "failed to create urisrc");
931 if (GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser))
932 g_object_set (G_OBJECT (stream->urisrc), "is-live", TRUE, NULL);
934 g_object_set (G_OBJECT (stream->urisrc), "is-live", FALSE, NULL);
936 name = g_strdup_printf("%s-%s", stream->name, "parser");
937 stream->parser = gst_element_factory_make ("piffdemux", name);
938 if (!stream->parser) {
939 GST_ERROR_OBJECT (demux, "failed to create piffdemux element");
946 gst_caps_unref (stream->caps);
948 stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
949 caps_string = gst_caps_to_string(stream->caps);
950 GST_INFO_OBJECT (stream->pad, "prepare caps = %s", caps_string);
954 g_object_set (G_OBJECT (stream->parser), "caps", stream->caps, NULL);
955 g_object_set (G_OBJECT (stream->parser), "start-ts", start_ts, NULL);
956 g_object_set (G_OBJECT (stream->parser), "duration", GST_SSM_PARSE_GET_DURATION(demux->parser), NULL);
957 g_object_set (G_OBJECT (stream->parser), "is-live", GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser), NULL);
958 g_object_set (G_OBJECT (stream->parser), "lookahead-count", GST_SSM_PARSE_LOOKAHEAD_COUNT(demux->parser), NULL);
959 if (demux->protection_header)
960 g_object_set (G_OBJECT (stream->parser), "protection-header", demux->protection_header, NULL);
961 g_signal_connect (stream->parser, "live-param", G_CALLBACK (gst_ss_demux_append_live_params), stream);
963 name = g_strdup_printf("%s-%s", stream->name, "sink");
964 stream->sink = gst_element_factory_make ("appsink", name);
966 GST_ERROR_OBJECT (demux, "failed to create appsink element");
971 g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
972 g_signal_connect (stream->sink, "new-buffer", G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
974 gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
975 if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
976 GST_ERROR ("failed to link elements...");
980 stream->bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
981 gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
982 gst_object_unref (stream->bus);
992 gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream)
995 GstCaps *caps = NULL;
996 GstElement *capsfilter = NULL;
997 GstElement *enc = NULL;
998 guint64 avg_dur = -1;
999 guint frame_rate = 0;
1001 name = g_strdup_printf("%s-%s", stream->name, "dummy");
1003 stream->pipe = gst_pipeline_new (name);
1004 if (!stream->pipe) {
1005 GST_ERROR_OBJECT (demux, "failed to create pipeline");
1010 /* create dummy sender source */
1011 name = g_strdup_printf("%s-%s", stream->name, "dummysrc");
1012 stream->urisrc = gst_element_factory_make ("imagereader", name);
1013 if (!stream->urisrc) {
1014 GST_ERROR_OBJECT (demux,"failed to create filesrc element");
1018 g_object_set (G_OBJECT (stream->urisrc), "location", "/opt/home/root/aonly_VGA_1frame_I420.yuv", NULL);
1019 g_object_set (G_OBJECT (stream->urisrc), "framerate", 25, NULL);
1020 g_object_set (G_OBJECT (stream->urisrc), "num-buffers", 60, NULL);
1023 capsfilter = gst_element_factory_make ("capsfilter", NULL);
1025 GST_ERROR_OBJECT (demux, "failed to create capsfilter element");
1028 caps = gst_caps_new_simple ("video/x-raw-yuv",
1029 "width", G_TYPE_INT, 640,
1030 "height", G_TYPE_INT, 480,
1031 "framerate",GST_TYPE_FRACTION, 25,1,
1032 "format", GST_TYPE_FOURCC, GST_MAKE_FOURCC ('I', '4', '2', '0'),
1034 g_object_set (G_OBJECT (capsfilter), "caps", caps, NULL);
1036 /* create h264parse element */
1037 enc = gst_element_factory_make ("savsenc_h264", "H264 encoder");
1039 GST_ERROR_OBJECT (demux, "failed to create h264 parse element");
1042 name = g_strdup_printf("%s-%s", stream->name, "sink");
1043 stream->sink = gst_element_factory_make ("appsink", name);
1044 if (!stream->sink) {
1045 GST_ERROR_OBJECT (demux, "failed to create appsink element");
1049 g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
1050 g_signal_connect (stream->sink, "new-buffer", G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
1052 /* add to pipeline & link all elements */
1053 gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, capsfilter, enc, stream->sink, NULL);
1055 if (!gst_element_link_many (stream->urisrc, capsfilter, enc, stream->sink, NULL)) {
1056 GST_ERROR_OBJECT (demux,"failed to link dummy pipe elements...");
1060 stream->bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
1061 gst_bus_add_watch (stream->bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
1062 gst_object_unref (stream->bus);
1068 gst_ss_demux_create_dummy_pipe (GstSSDemux * demux, GstSSDemuxStream *stream)
1072 GstCaps *caps = NULL;
1074 name = g_strdup_printf("%s-%s", stream->name, "dummy");
1076 stream->pipe = gst_pipeline_new (name);
1077 if (!stream->pipe) {
1078 GST_ERROR_OBJECT (demux, "failed to create pipeline");
1083 /* create dummy sender source */
1084 name = g_strdup_printf("%s-%s", stream->name, "dummysrc");
1085 stream->urisrc = gst_element_factory_make ("filesrc", name);
1086 if (!stream->urisrc) {
1087 GST_ERROR_OBJECT (demux,"failed to create filesrc element");
1091 g_object_set (G_OBJECT (stream->urisrc), "location", "/opt/home/root/sound_2sec.264", NULL);
1093 /* create appsink element */
1094 name = g_strdup_printf("%s-%s", stream->name, "parser");
1095 stream->parser= gst_element_factory_make ("legacyh264parse", name);
1096 if (!stream->parser) {
1097 GST_ERROR_OBJECT (demux, "failed to create h264 parse element");
1100 g_object_set (G_OBJECT (stream->parser), "output-format", 1, NULL);
1102 /* create appsink element */
1103 name = g_strdup_printf("%s-%s", stream->name, "sink");
1104 stream->sink = gst_element_factory_make ("appsink", name);
1105 if (!stream->sink) {
1106 GST_ERROR_OBJECT (demux, "failed to create appsink element");
1109 g_object_set (G_OBJECT (stream->sink), "emit-signals", TRUE, "sync", FALSE, NULL);
1111 caps = gst_caps_new_simple ("video/x-h264",
1112 "width", G_TYPE_INT, 640,
1113 "height", G_TYPE_INT, 480,
1114 "stream-format", G_TYPE_STRING, "byte-stream",
1116 g_object_set (G_OBJECT (stream->sink), "caps", caps, NULL);
1118 g_signal_connect (stream->sink, "new-buffer", G_CALLBACK (gst_ssm_demux_on_new_buffer), stream);
1121 /* add to pipeline & link all elements */
1122 gst_bin_add_many (GST_BIN (stream->pipe), stream->urisrc, stream->parser, stream->sink, NULL);
1123 if (!gst_element_link_many (stream->urisrc, stream->parser, stream->sink, NULL)) {
1124 GST_ERROR_OBJECT (demux,"failed to link elements...");
1128 bus = gst_pipeline_get_bus (GST_PIPELINE (stream->pipe));
1129 gst_bus_add_watch (bus, (GstBusFunc)gst_ss_demux_download_bus_cb, stream);
1130 gst_object_unref (bus);
1138 gst_ss_demux_download_bus_cb(GstBus *bus, GstMessage *msg, gpointer data)
1140 GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
1141 GstSSDemux *demux = stream->parent;
1143 switch (GST_MESSAGE_TYPE(msg)) {
1144 case GST_MESSAGE_EOS: {
1145 GTimeVal time = {0, };
1147 guint64 total_push_time = 0;
1148 guint64 download_rate = 0;
1150 GST_INFO_OBJECT (stream->pad, "received EOS on download pipe..");
1151 // increase the fragment count on EOS
1154 /* download rate calculation : note down start time*/
1155 g_get_current_time (&time);
1156 stream->download_stop_ts = (time.tv_sec * 1000000)+ time.tv_usec;
1158 download_rate = ((stream->download_size * 8 * 1000000) / (stream->download_stop_ts - stream->download_start_ts - stream->push_block_time));
1159 g_print("*********** '%s' download rate = %"G_GUINT64_FORMAT" bpssss **************\n", stream->name, download_rate);
1160 stream->download_size = 0;
1161 stream->download_stop_ts = stream->download_start_ts = 0;
1162 stream->push_block_time = 0;
1164 if ((stream->type == SS_STREAM_VIDEO) && (demux->ss_mode != SS_MODE_AONLY)) {
1165 if (!stream->is_buffering) {
1166 /* for switching, we are considering video download rate only */
1167 demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
1169 } else if (stream->type == SS_STREAM_AUDIO && (demux->ss_mode == SS_MODE_AONLY)) {
1170 /* when video is not present using audio download rate to calculate switching */
1171 demux->ss_mode = gst_ssm_parse_switch_qualitylevel (demux->parser, download_rate);
1172 if (demux->ss_mode != SS_MODE_AONLY) {
1173 g_print ("\n\nMoving to AV mode by audio considering audio download rate\n\n\n\n");
1177 g_cond_signal (stream->cond);
1179 #ifdef SIMULATE_AUDIO_ONLY
1180 /* when fragment count is multiple of 4, switch to audio only case */
1181 if ((stream->frag_cnt % 4 == 0) && (stream->type == SS_STREAM_VIDEO) &&
1182 GST_SSM_PARSE_IS_LIVE_PRESENTATION(demux->parser)) {
1183 g_print ("\n\t ######## Forcibly switching to audio only for testing ##########\n");
1184 demux->ss_mode = SS_MODE_AONLY;
1187 GST_DEBUG_OBJECT (stream->pad, "Signalling eos condition...");
1189 GST_DEBUG_OBJECT (demux, "number of fragments downloaded = %d", stream->frag_cnt);
1192 case GST_MESSAGE_ERROR: {
1193 GError *error = NULL;
1194 gchar* debug = NULL;
1196 g_print ("Error from %s\n", gst_element_get_name (GST_MESSAGE_SRC(msg)));
1198 gst_message_parse_error( msg, &error, &debug);
1200 GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: error= %s\n", error->message);
1202 GST_ERROR_OBJECT (demux, "GST_MESSAGE_ERROR: debug = %s\n", debug);
1204 /* handling error, when client requests url, which is yet to be prepared by server */
1205 if (GST_IS_URI_HANDLER(GST_MESSAGE_SRC(msg))) {
1206 GstStateChangeReturn ret;
1208 /* wait for 1sec & request the url again */
1209 // TODO: need to make wait time as generic or Adding loop count to request again & again
1211 GST_INFO_OBJECT (demux, "ERROR : code = %d, msg = %s, NEED to request again", error->code, error->message);
1213 usleep (1000000); // 1 sec
1215 /* put the current pipeline to NULL state */
1216 gst_element_set_state (stream->pipe, GST_STATE_NULL);
1217 gst_element_get_state (stream->pipe, NULL, NULL, GST_CLOCK_TIME_NONE);
1218 stream->pipe = stream->urisrc = stream->parser = stream->sink = NULL;
1220 g_print ("Going to download fragment AGAIN : %s\n", stream->uri);
1221 if (!gst_ss_demux_create_download_pipe (demux, stream, stream->uri, stream->start_ts)) {
1222 GST_ERROR_OBJECT (demux, "failed to create download pipeline");
1223 if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1224 GST_ERROR_OBJECT (demux, "failed to post error");
1232 ret = gst_element_set_state (stream->pipe, GST_STATE_PLAYING);
1233 if (ret == GST_STATE_CHANGE_FAILURE) {
1234 if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1235 GST_ERROR_OBJECT (demux, "failed to post error");
1242 g_print ("GST_MESSAGE_ERROR: error= %s\n", error->message);
1244 g_print ("GST_MESSAGE_ERROR: debug = %s\n", debug);
1245 if (!gst_element_post_message (GST_ELEMENT(demux), msg)) {
1246 GST_ERROR_OBJECT (demux, "failed to post error");
1247 gst_ss_demux_stop (demux, stream);
1250 g_error_free(error);
1253 gst_ss_demux_stop (demux, stream);
1258 g_error_free( error);
1261 case GST_MESSAGE_BUFFERING: {
1263 int total_cache_perc = 0;
1264 int active_stream_cnt = 0;
1265 GstSSDemuxStream *cur_stream = NULL;
1266 int avg_percent = 0;
1268 /* update buffer percent */
1269 gst_message_parse_buffering (msg, &stream->rcvd_percent);
1270 gchar *name = gst_element_get_name (GST_MESSAGE_SRC (msg));
1271 GST_LOG_OBJECT (stream->pad, "Internal bus : Buffering from %s = %d\n", name, stream->rcvd_percent);
1274 // TODO: need to check for better logic
1275 for (n = 0; n < SS_STREAM_NUM; n++) {
1276 cur_stream = demux->streams[n];
1278 active_stream_cnt++;
1279 total_cache_perc += cur_stream->rcvd_percent;
1283 avg_percent = total_cache_perc / active_stream_cnt;
1285 GST_LOG_OBJECT (demux, "avg buffering completed = %d", avg_percent);
1287 if (avg_percent > 100)
1290 // TODO: need to add mutex for protecting percent
1291 if (avg_percent != demux->percent) {
1292 demux->percent = avg_percent;
1293 GST_LOG_OBJECT (demux, "#########Posting %d buffering msg to main bus ###########", demux->percent);
1295 gst_element_post_message (GST_ELEMENT (demux), gst_message_new_buffering (GST_OBJECT (demux), avg_percent));
1299 case GST_MESSAGE_WARNING: {
1301 GError* error = NULL;
1302 gst_message_parse_warning(msg, &error, &debug);
1303 GST_WARNING_OBJECT(demux, "warning : %s\n", error->message);
1304 GST_WARNING_OBJECT(demux, "debug : %s\n", debug);
1305 g_error_free( error );
1310 GST_LOG_OBJECT(demux, "unhandled message : %s\n", gst_message_type_get_name (GST_MESSAGE_TYPE (msg)));
1319 gst_ss_demux_update_buffering (GstSSDemuxStream *stream, guint64 percent)
1321 gboolean do_post = FALSE;
1322 GstSSDemux *demux = stream->parent;
1324 if (stream->is_buffering) {
1326 if (percent >= demux->high_percent)
1327 stream->is_buffering = FALSE;
1329 if (percent < demux->low_percent) {
1330 stream->is_buffering = TRUE;
1336 GstMessage *message;
1337 GstBufferingMode mode;
1338 gint64 buffering_left = -1;
1340 percent = percent * 100 / demux->high_percent;
1345 if (percent != stream->percent) {
1346 stream->percent = percent;
1348 GST_DEBUG_OBJECT (stream->pad, "buffering %d percent", (gint) percent);
1349 g_print ("'%s' buffering %d percent done\n", stream->name, (gint) percent);
1351 /* posting buffering to internal bus, which will take average & post to main bus */
1352 message = gst_message_new_buffering (GST_OBJECT_CAST (stream->sink), (gint) percent);
1353 gst_element_post_message (GST_ELEMENT_CAST (stream->sink), message);
1360 gst_ssm_demux_on_new_buffer (GstElement * appsink, void* data)
1362 GstSSDemuxStream *stream = (GstSSDemuxStream *)data;
1363 GstSSDemux *demux = stream->parent;
1364 GstBuffer *inbuf = NULL;
1365 GstFlowReturn fret = GST_FLOW_OK;
1366 GstBuffer *headbuf = NULL;
1369 GTimeVal start = {0, };
1370 GTimeVal stop = {0, };
1371 guint64 push_start_time = 0;
1372 guint64 push_end_time =0;
1374 inbuf = gst_app_sink_pull_buffer ((GstAppSink *)appsink);
1376 GST_WARNING_OBJECT (demux, "Input buffer not available.,..\n");
1380 g_mutex_lock (stream->queue_lock);
1382 stream->download_size += GST_BUFFER_SIZE(inbuf);
1384 /* download rate calculation : note push_start_ts */
1385 g_get_current_time (&start);
1386 push_start_time = (start.tv_sec * 1000000)+ start.tv_usec;
1388 GST_LOG_OBJECT (stream->pad, "Inbuf : size = %d, ts = %"GST_TIME_FORMAT", dur = %"GST_TIME_FORMAT,
1389 GST_BUFFER_SIZE(inbuf), GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(inbuf)), GST_TIME_ARGS(GST_BUFFER_DURATION(inbuf)));
1391 g_queue_push_tail (stream->queue, inbuf);
1393 if (GST_BUFFER_DURATION_IS_VALID (inbuf)) {
1394 stream->cached_duration += GST_BUFFER_DURATION(inbuf);
1396 g_print ("\nDuration field is not valid.. check this issue !!!!!!!!\n");
1397 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid duration of a frame"), (NULL));
1398 g_mutex_unlock (stream->queue_lock);
1402 if (stream->cached_duration >= 0) {
1403 percent = (stream->cached_duration * 100) / demux->max_cache_time;
1404 //g_print ("[%s] percent done = %d[%"G_GINT64_FORMAT"]\n", ssm_parse_get_stream_name(stream->type), percent, percent);
1406 // TODO: need to decide, whther to call before wait or after ??
1407 gst_ss_demux_update_buffering (stream, percent);
1409 if (percent > 100) {
1410 /* update buffering & wait if space is not available */
1411 GST_DEBUG_OBJECT (stream->pad, "Reached more than 100 percent, queue full & wait till free");
1412 g_cond_wait(stream->queue_full, stream->queue_lock);
1413 GST_DEBUG_OBJECT (stream->pad,"Received signal to add more data...");
1416 g_print ("cached duration can not be negative\n\n\n");
1417 GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid cached duration"), (NULL));
1418 g_mutex_unlock (stream->queue_lock);
1422 /* download rate calculation : note push_stop_ts */
1423 g_get_current_time (&stop);
1424 push_end_time = (stop.tv_sec * 1000000)+ stop.tv_usec;
1426 stream->push_block_time += push_end_time - push_start_time;
1428 g_cond_signal (stream->queue_empty);
1430 g_mutex_unlock (stream->queue_lock);
1435 gst_ss_demux_stop (GstSSDemux * demux, GstSSDemuxStream *stream)
1437 if (GST_TASK_STATE (stream->stream_task) != GST_TASK_STOPPED)
1438 gst_task_stop (stream->stream_task);
1442 gst_ss_demux_stream_init (GstSSDemux *demux, GstSSDemuxStream *stream, SS_STREAM_TYPE stream_type)
1444 stream->cond = g_cond_new ();
1445 stream->lock = g_mutex_new ();
1446 stream->queue = g_queue_new ();
1447 stream->queue_full = g_cond_new ();
1448 stream->queue_empty = g_cond_new ();
1449 stream->queue_lock = g_mutex_new ();
1450 stream->parent = demux;
1451 stream->pipe = NULL;
1452 stream->urisrc = NULL;
1453 stream->parser = NULL;
1454 stream->sink = NULL;
1455 stream->frag_cnt = 0;
1456 stream->type = stream_type ;
1458 stream->start_ts = -1;
1459 stream->sent_ns = FALSE;
1460 stream->switch_ts = GST_CLOCK_TIME_NONE;
1461 stream->avg_dur = GST_CLOCK_TIME_NONE;
1462 stream->percent = 100;
1463 stream->rcvd_percent = 0;
1464 stream->push_block_time = 0;
1465 stream->cached_duration = 0;
1466 stream->download_start_ts = 0;
1467 stream->download_stop_ts = 0;
1468 stream->download_size = 0;
1470 if (stream->type == SS_STREAM_VIDEO) {
1471 stream->pad = gst_pad_new_from_static_template (&ssdemux_videosrc_template, "video");
1472 stream->name = g_strdup("video");
1473 } else if (stream->type == SS_STREAM_AUDIO) {
1474 stream->pad = gst_pad_new_from_static_template (&ssdemux_audiosrc_template, "audio");
1475 stream->name = g_strdup("audio");
1476 } else if (stream->type == SS_STREAM_TEXT) {
1477 stream->pad = gst_pad_new_from_static_template (&ssdemux_subsrc_template, "subtitle");
1478 stream->name = g_strdup("text");
1481 GST_PAD_ELEMENT_PRIVATE (stream->pad) = stream;
1483 gst_pad_use_fixed_caps (stream->pad);
1484 gst_pad_set_event_function (stream->pad, gst_ss_demux_handle_src_event);
1485 gst_pad_set_query_function (stream->pad, gst_ss_demux_handle_src_query);
1487 stream->caps = ssm_parse_get_stream_caps (demux->parser, stream->type);
1488 gchar *caps_name = gst_caps_to_string(stream->caps);
1489 g_print ("prepare video caps = %s", caps_name);
1492 GST_DEBUG_OBJECT (demux, "setting caps %" GST_PTR_FORMAT, stream->caps);
1493 gst_pad_set_caps (stream->pad, stream->caps);
1495 GST_DEBUG_OBJECT (demux, "adding pad %s %p to demux %p", GST_OBJECT_NAME (stream->pad), stream->pad, demux);
1496 gst_pad_set_active (stream->pad, TRUE);
1497 gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->pad);
1501 gst_ss_demux_stream_free (GstSSDemux * demux, GstSSDemuxStream * stream)
1503 if (stream->queue) {
1504 while (!g_queue_is_empty(stream->queue)) {
1505 gst_buffer_unref (g_queue_pop_head (stream->queue));
1507 g_queue_free (stream->queue);
1508 stream->queue = NULL;
1512 gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
1516 g_cond_free (stream->cond);
1517 stream->cond = NULL;
1520 g_mutex_free (stream->lock);
1521 stream->lock = NULL;
1523 if (stream->queue_lock) {
1524 g_mutex_free (stream->queue_lock);
1525 stream->queue_lock = NULL;
1527 if (stream->queue_full) {
1528 g_cond_free (stream->queue_full);
1529 stream->queue_full = NULL;
1531 if (stream->queue_empty) {
1532 g_cond_free (stream->queue_empty);
1533 stream->queue_empty= NULL;
1538 ssdemux_init (GstPlugin * plugin)
1540 if (!gst_element_register (plugin, "ssdemux", GST_RANK_PRIMARY,
1541 GST_TYPE_SS_DEMUX) || FALSE)
1546 GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
1549 "Smooth streaming demux plugin",
1550 ssdemux_init, VERSION, "LGPL", PACKAGE_NAME, "http://www.samsung.com/")