From cb7e3d1f3b66943aeda200b7fd9e0f9e877891c7 Mon Sep 17 00:00:00 2001 From: Thiago Santos Date: Fri, 28 Dec 2012 00:48:33 -0300 Subject: [PATCH] mssdemux: initial implementation of seek event handling Adds basic handling for seek in time events. Needs to cooperate with the downstream qtdemux so that it forwards the seeks and the corresponding newsegments --- ext/smoothstreaming/gstmssdemux.c | 156 +++++++++++++++++++++++++-- ext/smoothstreaming/gstmssdemux.h | 3 + ext/smoothstreaming/gstmssmanifest.c | 77 +++++++++++++ ext/smoothstreaming/gstmssmanifest.h | 3 + 4 files changed, 231 insertions(+), 8 deletions(-) diff --git a/ext/smoothstreaming/gstmssdemux.c b/ext/smoothstreaming/gstmssdemux.c index d2a68f7068..75a9804511 100644 --- a/ext/smoothstreaming/gstmssdemux.c +++ b/ext/smoothstreaming/gstmssdemux.c @@ -164,6 +164,12 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream) stream->stream_task = NULL; } + if (stream->pending_newsegment) { + gst_event_unref (stream->pending_newsegment); + stream->pending_newsegment = NULL; + } + + if (stream->downloader != NULL) { g_object_unref (stream->downloader); stream->downloader = NULL; @@ -264,6 +270,20 @@ gst_mss_demux_start (GstMssDemux * mssdemux) } } +static gboolean +gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event) +{ + GSList *iter; + gboolean ret = TRUE; + + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + gst_event_ref (event); + ret = ret & gst_pad_push_event (stream->pad, event); + } + return ret; +} + static gboolean gst_mss_demux_event (GstPad * pad, GstEvent * event) { @@ -295,6 +315,101 @@ gst_mss_demux_event (GstPad * pad, GstEvent * event) return ret; } +static gboolean +gst_mss_demux_src_event (GstPad * pad, GstEvent * event) +{ + GstMssDemux *mssdemux; + + mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad)); + + switch (event->type) { + case GST_EVENT_SEEK: + { + gdouble rate; + GstFormat format; + GstSeekFlags flags; + GstSeekType start_type, stop_type; + gint64 start, stop; + GstEvent *newsegment; + GSList *iter; + + GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK"); + + gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start, + &stop_type, &stop); + + if (format != GST_FORMAT_TIME) + return FALSE; + + GST_DEBUG_OBJECT (mssdemux, + "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %" + GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop)); + + if (flags & GST_SEEK_FLAG_FLUSH) { + GstEvent *flush = gst_event_new_flush_start (); + GST_DEBUG_OBJECT (mssdemux, "sending flush start"); + + gst_event_set_seqnum (flush, gst_event_get_seqnum (event)); + gst_mss_demux_push_src_event (mssdemux, flush); + gst_event_unref (flush); + } + + /* stop the tasks */ + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + + gst_uri_downloader_cancel (stream->downloader); + gst_task_pause (stream->stream_task); + } + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + g_static_rec_mutex_lock (&stream->stream_lock); + } + + if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {; + GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment"); + return FALSE; + } + + newsegment = + gst_event_new_new_segment (FALSE, rate, format, start, stop, start); + gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event)); + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + + stream->pending_newsegment = gst_event_ref (newsegment); + } + gst_event_unref (newsegment); + + if (flags & GST_SEEK_FLAG_FLUSH) { + GstEvent *flush = gst_event_new_flush_stop (); + GST_DEBUG_OBJECT (mssdemux, "sending flush stop"); + + gst_event_set_seqnum (flush, gst_event_get_seqnum (event)); + gst_mss_demux_push_src_event (mssdemux, flush); + gst_event_unref (flush); + } + + /* restart tasks */ + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + g_static_rec_mutex_unlock (&stream->stream_lock); + } + for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) { + GstMssDemuxStream *stream = iter->data; + + gst_task_start (stream->stream_task); + } + + return TRUE; + } + default: + break; + } + + return gst_pad_event_default (pad, event); +} + static gboolean gst_mss_demux_src_query (GstPad * pad, GstQuery * query) { @@ -313,14 +428,8 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query) gst_query_parse_duration (query, &fmt, NULL); if (fmt == GST_FORMAT_TIME && mssdemux->manifest) { - /* TODO should we use the streams accumulated duration? */ - guint64 dur = gst_mss_manifest_get_duration (mssdemux->manifest); - guint64 timescale = gst_mss_manifest_get_timescale (mssdemux->manifest); - - if (dur != -1 && timescale != -1) - duration = - (GstClockTime) gst_util_uint64_scale_round (dur, GST_SECOND, - timescale); + /* TODO should we use the streams accumulated duration or the main manifest duration? */ + duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest); if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) { gst_query_set_duration (query, GST_FORMAT_TIME, duration); @@ -335,6 +444,25 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query) gst_query_set_latency (query, FALSE, 0, -1); ret = TRUE; break; + case GST_QUERY_SEEKING:{ + GstFormat fmt; + gint64 stop = -1; + + gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL); + GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d", + fmt); + if (fmt == GST_FORMAT_TIME) { + GstClockTime duration; + duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest); + if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) + stop = duration; + gst_query_set_seeking (query, fmt, TRUE, 0, stop); + ret = TRUE; + GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %" + GST_TIME_FORMAT, GST_TIME_ARGS (stop)); + } + break; + } default: /* Don't fordward queries upstream because of the special nature of this * "demuxer", which relies on the upstream element only to be fed @@ -350,6 +478,7 @@ static void _set_src_pad_functions (GstPad * pad) { gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query)); + gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event)); } static void @@ -508,10 +637,21 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream) g_free (path); g_free (url); + if (!fragment) { + GST_INFO_OBJECT (mssdemux, "No fragment downloaded"); + /* TODO check if we are truly stoping */ + return; + } + buffer = gst_fragment_get_buffer (fragment); buffer = gst_buffer_make_metadata_writable (buffer); gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad)); + if (G_UNLIKELY (stream->pending_newsegment)) { + gst_pad_push_event (stream->pad, stream->pending_newsegment); + stream->pending_newsegment = NULL; + } + ret = gst_pad_push (stream->pad, buffer); switch (ret) { case GST_FLOW_UNEXPECTED: diff --git a/ext/smoothstreaming/gstmssdemux.h b/ext/smoothstreaming/gstmssdemux.h index 52c2377e00..9dee169e88 100644 --- a/ext/smoothstreaming/gstmssdemux.h +++ b/ext/smoothstreaming/gstmssdemux.h @@ -59,6 +59,8 @@ struct _GstMssDemuxStream { GstUriDownloader *downloader; + GstEvent *pending_newsegment; + /* Streaming task */ GstTask *stream_task; GStaticRecMutex stream_lock; @@ -78,6 +80,7 @@ struct _GstMssDemux { GSList *streams; guint n_videos; guint n_audios; + }; struct _GstMssDemuxClass { diff --git a/ext/smoothstreaming/gstmssmanifest.c b/ext/smoothstreaming/gstmssmanifest.c index 34c36f69e9..8385a9f0af 100644 --- a/ext/smoothstreaming/gstmssmanifest.c +++ b/ext/smoothstreaming/gstmssmanifest.c @@ -496,6 +496,28 @@ gst_mss_manifest_get_duration (GstMssManifest * manifest) return dur; } + +/** + * Gets the duration in nanoseconds + */ +GstClockTime +gst_mss_manifest_get_gst_duration (GstMssManifest * manifest) +{ + guint64 duration = -1; + guint64 timescale; + GstClockTime gstdur = GST_CLOCK_TIME_NONE; + + duration = gst_mss_manifest_get_duration (manifest); + timescale = gst_mss_manifest_get_timescale (manifest); + + if (duration != -1 && timescale != -1) + gstdur = + (GstClockTime) gst_util_uint64_scale_round (duration, GST_SECOND, + timescale); + + return gstdur; +} + GstCaps * gst_mss_stream_get_caps (GstMssStream * stream) { @@ -565,3 +587,58 @@ gst_mss_stream_type_name (GstMssStreamType streamtype) return "unknown"; } } + +/** + * Seeks all streams to the fragment that contains the set time + * + * @time: time in nanoseconds + */ +gboolean +gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time) +{ + gboolean ret = TRUE; + GSList *iter; + + for (iter = manifest->streams; iter; iter = g_slist_next (iter)) { + ret = gst_mss_stream_seek (iter->data, time) & ret; + } + + return ret; +} + +/** + * Seeks this stream to the fragment that contains the sample at time + * + * @time: time in nanoseconds + */ +gboolean +gst_mss_stream_seek (GstMssStream * stream, guint64 time) +{ + GList *iter; + guint64 timescale; + + timescale = gst_mss_stream_get_timescale (stream); + time = gst_util_uint64_scale_round (time, timescale, GST_SECOND); + + for (iter = stream->fragments; iter; iter = g_list_next (iter)) { + GList *next = g_list_next (iter); + if (next) { + GstMssStreamFragment *fragment = next->data; + + if (fragment->time > time) { + stream->current_fragment = iter; + break; + } + } else { + GstMssStreamFragment *fragment = iter->data; + if (fragment->time + fragment->duration > time) { + stream->current_fragment = iter; + } else { + stream->current_fragment = NULL; /* EOS */ + } + break; + } + } + + return TRUE; +} diff --git a/ext/smoothstreaming/gstmssmanifest.h b/ext/smoothstreaming/gstmssmanifest.h index 280bdaeebf..b75b60e1ae 100644 --- a/ext/smoothstreaming/gstmssmanifest.h +++ b/ext/smoothstreaming/gstmssmanifest.h @@ -43,12 +43,15 @@ void gst_mss_manifest_free (GstMssManifest * manifest); GSList * gst_mss_manifest_get_streams (GstMssManifest * manifest); guint64 gst_mss_manifest_get_timescale (GstMssManifest * manifest); guint64 gst_mss_manifest_get_duration (GstMssManifest * manifest); +GstClockTime gst_mss_manifest_get_gst_duration (GstMssManifest * manifest); +gboolean gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time); GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream); GstCaps * gst_mss_stream_get_caps (GstMssStream * stream); guint64 gst_mss_stream_get_timescale (GstMssStream * stream); GstFlowReturn gst_mss_stream_get_fragment_url (GstMssStream * stream, gchar ** url); GstFlowReturn gst_mss_stream_advance_fragment (GstMssStream * stream); +gboolean gst_mss_stream_seek (GstMssStream * stream, guint64 time); const gchar * gst_mss_stream_type_name (GstMssStreamType streamtype); -- 2.34.1