mssdemux: initial implementation of seek event handling
authorThiago Santos <thiago.sousa.santos@collabora.com>
Fri, 28 Dec 2012 03:48:33 +0000 (00:48 -0300)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Wed, 8 May 2013 00:05:11 +0000 (21:05 -0300)
Adds basic handling for seek in time events. Needs to cooperate
with the downstream qtdemux so that it forwards the seeks and
the corresponding newsegments

ext/smoothstreaming/gstmssdemux.c
ext/smoothstreaming/gstmssdemux.h
ext/smoothstreaming/gstmssmanifest.c
ext/smoothstreaming/gstmssmanifest.h

index d2a68f706821904dbc2cf19e681ed0580a8e4fc2..75a98045114f9a5820acb7b9003183869a27c458 100644 (file)
@@ -164,6 +164,12 @@ gst_mss_demux_stream_free (GstMssDemuxStream * stream)
     stream->stream_task = NULL;
   }
 
+  if (stream->pending_newsegment) {
+    gst_event_unref (stream->pending_newsegment);
+    stream->pending_newsegment = NULL;
+  }
+
+
   if (stream->downloader != NULL) {
     g_object_unref (stream->downloader);
     stream->downloader = NULL;
@@ -264,6 +270,20 @@ gst_mss_demux_start (GstMssDemux * mssdemux)
   }
 }
 
+static gboolean
+gst_mss_demux_push_src_event (GstMssDemux * mssdemux, GstEvent * event)
+{
+  GSList *iter;
+  gboolean ret = TRUE;
+
+  for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+    GstMssDemuxStream *stream = iter->data;
+    gst_event_ref (event);
+    ret = ret & gst_pad_push_event (stream->pad, event);
+  }
+  return ret;
+}
+
 static gboolean
 gst_mss_demux_event (GstPad * pad, GstEvent * event)
 {
@@ -295,6 +315,101 @@ gst_mss_demux_event (GstPad * pad, GstEvent * event)
   return ret;
 }
 
+static gboolean
+gst_mss_demux_src_event (GstPad * pad, GstEvent * event)
+{
+  GstMssDemux *mssdemux;
+
+  mssdemux = GST_MSS_DEMUX (GST_PAD_PARENT (pad));
+
+  switch (event->type) {
+    case GST_EVENT_SEEK:
+    {
+      gdouble rate;
+      GstFormat format;
+      GstSeekFlags flags;
+      GstSeekType start_type, stop_type;
+      gint64 start, stop;
+      GstEvent *newsegment;
+      GSList *iter;
+
+      GST_INFO_OBJECT (mssdemux, "Received GST_EVENT_SEEK");
+
+      gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
+          &stop_type, &stop);
+
+      if (format != GST_FORMAT_TIME)
+        return FALSE;
+
+      GST_DEBUG_OBJECT (mssdemux,
+          "seek event, rate: %f start: %" GST_TIME_FORMAT " stop: %"
+          GST_TIME_FORMAT, rate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop));
+
+      if (flags & GST_SEEK_FLAG_FLUSH) {
+        GstEvent *flush = gst_event_new_flush_start ();
+        GST_DEBUG_OBJECT (mssdemux, "sending flush start");
+
+        gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
+        gst_mss_demux_push_src_event (mssdemux, flush);
+        gst_event_unref (flush);
+      }
+
+      /* stop the tasks */
+      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+        GstMssDemuxStream *stream = iter->data;
+
+        gst_uri_downloader_cancel (stream->downloader);
+        gst_task_pause (stream->stream_task);
+      }
+      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+        GstMssDemuxStream *stream = iter->data;
+        g_static_rec_mutex_lock (&stream->stream_lock);
+      }
+
+      if (!gst_mss_manifest_seek (mssdemux->manifest, start)) {;
+        GST_WARNING_OBJECT (mssdemux, "Could not find seeked fragment");
+        return FALSE;
+      }
+
+      newsegment =
+          gst_event_new_new_segment (FALSE, rate, format, start, stop, start);
+      gst_event_set_seqnum (newsegment, gst_event_get_seqnum (event));
+      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+        GstMssDemuxStream *stream = iter->data;
+
+        stream->pending_newsegment = gst_event_ref (newsegment);
+      }
+      gst_event_unref (newsegment);
+
+      if (flags & GST_SEEK_FLAG_FLUSH) {
+        GstEvent *flush = gst_event_new_flush_stop ();
+        GST_DEBUG_OBJECT (mssdemux, "sending flush stop");
+
+        gst_event_set_seqnum (flush, gst_event_get_seqnum (event));
+        gst_mss_demux_push_src_event (mssdemux, flush);
+        gst_event_unref (flush);
+      }
+
+      /* restart tasks */
+      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+        GstMssDemuxStream *stream = iter->data;
+        g_static_rec_mutex_unlock (&stream->stream_lock);
+      }
+      for (iter = mssdemux->streams; iter; iter = g_slist_next (iter)) {
+        GstMssDemuxStream *stream = iter->data;
+
+        gst_task_start (stream->stream_task);
+      }
+
+      return TRUE;
+    }
+    default:
+      break;
+  }
+
+  return gst_pad_event_default (pad, event);
+}
+
 static gboolean
 gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
 {
@@ -313,14 +428,8 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
 
       gst_query_parse_duration (query, &fmt, NULL);
       if (fmt == GST_FORMAT_TIME && mssdemux->manifest) {
-        /* TODO should we use the streams accumulated duration? */
-        guint64 dur = gst_mss_manifest_get_duration (mssdemux->manifest);
-        guint64 timescale = gst_mss_manifest_get_timescale (mssdemux->manifest);
-
-        if (dur != -1 && timescale != -1)
-          duration =
-              (GstClockTime) gst_util_uint64_scale_round (dur, GST_SECOND,
-              timescale);
+        /* TODO should we use the streams accumulated duration or the main manifest duration? */
+        duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
 
         if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0) {
           gst_query_set_duration (query, GST_FORMAT_TIME, duration);
@@ -335,6 +444,25 @@ gst_mss_demux_src_query (GstPad * pad, GstQuery * query)
       gst_query_set_latency (query, FALSE, 0, -1);
       ret = TRUE;
       break;
+    case GST_QUERY_SEEKING:{
+      GstFormat fmt;
+      gint64 stop = -1;
+
+      gst_query_parse_seeking (query, &fmt, NULL, NULL, NULL);
+      GST_INFO_OBJECT (mssdemux, "Received GST_QUERY_SEEKING with format %d",
+          fmt);
+      if (fmt == GST_FORMAT_TIME) {
+        GstClockTime duration;
+        duration = gst_mss_manifest_get_gst_duration (mssdemux->manifest);
+        if (GST_CLOCK_TIME_IS_VALID (duration) && duration > 0)
+          stop = duration;
+        gst_query_set_seeking (query, fmt, TRUE, 0, stop);
+        ret = TRUE;
+        GST_INFO_OBJECT (mssdemux, "GST_QUERY_SEEKING returning with stop : %"
+            GST_TIME_FORMAT, GST_TIME_ARGS (stop));
+      }
+      break;
+    }
     default:
       /* Don't fordward queries upstream because of the special nature of this
        *  "demuxer", which relies on the upstream element only to be fed
@@ -350,6 +478,7 @@ static void
 _set_src_pad_functions (GstPad * pad)
 {
   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_query));
+  gst_pad_set_event_function (pad, GST_DEBUG_FUNCPTR (gst_mss_demux_src_event));
 }
 
 static void
@@ -508,10 +637,21 @@ gst_mss_demux_stream_loop (GstMssDemuxStream * stream)
   g_free (path);
   g_free (url);
 
+  if (!fragment) {
+    GST_INFO_OBJECT (mssdemux, "No fragment downloaded");
+    /* TODO check if we are truly stoping */
+    return;
+  }
+
   buffer = gst_fragment_get_buffer (fragment);
   buffer = gst_buffer_make_metadata_writable (buffer);
   gst_buffer_set_caps (buffer, GST_PAD_CAPS (stream->pad));
 
+  if (G_UNLIKELY (stream->pending_newsegment)) {
+    gst_pad_push_event (stream->pad, stream->pending_newsegment);
+    stream->pending_newsegment = NULL;
+  }
+
   ret = gst_pad_push (stream->pad, buffer);
   switch (ret) {
     case GST_FLOW_UNEXPECTED:
index 52c2377e002c8014fe0aae8f69403637012c06db..9dee169e880c0b988b971b7e54d01350905297af 100644 (file)
@@ -59,6 +59,8 @@ struct _GstMssDemuxStream {
 
   GstUriDownloader *downloader;
 
+  GstEvent *pending_newsegment;
+
   /* Streaming task */
   GstTask *stream_task;
   GStaticRecMutex stream_lock;
@@ -78,6 +80,7 @@ struct _GstMssDemux {
   GSList *streams;
   guint n_videos;
   guint n_audios;
+
 };
 
 struct _GstMssDemuxClass {
index 34c36f69e9cb32fab5d52a2d6a0c1966de8265bf..8385a9f0afe5a35baadf6fd19b5a5c4d8945ab21 100644 (file)
@@ -496,6 +496,28 @@ gst_mss_manifest_get_duration (GstMssManifest * manifest)
   return dur;
 }
 
+
+/**
+ * Gets the duration in nanoseconds
+ */
+GstClockTime
+gst_mss_manifest_get_gst_duration (GstMssManifest * manifest)
+{
+  guint64 duration = -1;
+  guint64 timescale;
+  GstClockTime gstdur = GST_CLOCK_TIME_NONE;
+
+  duration = gst_mss_manifest_get_duration (manifest);
+  timescale = gst_mss_manifest_get_timescale (manifest);
+
+  if (duration != -1 && timescale != -1)
+    gstdur =
+        (GstClockTime) gst_util_uint64_scale_round (duration, GST_SECOND,
+        timescale);
+
+  return gstdur;
+}
+
 GstCaps *
 gst_mss_stream_get_caps (GstMssStream * stream)
 {
@@ -565,3 +587,58 @@ gst_mss_stream_type_name (GstMssStreamType streamtype)
       return "unknown";
   }
 }
+
+/**
+ * Seeks all streams to the fragment that contains the set time
+ *
+ * @time: time in nanoseconds
+ */
+gboolean
+gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time)
+{
+  gboolean ret = TRUE;
+  GSList *iter;
+
+  for (iter = manifest->streams; iter; iter = g_slist_next (iter)) {
+    ret = gst_mss_stream_seek (iter->data, time) & ret;
+  }
+
+  return ret;
+}
+
+/**
+ * Seeks this stream to the fragment that contains the sample at time
+ *
+ * @time: time in nanoseconds
+ */
+gboolean
+gst_mss_stream_seek (GstMssStream * stream, guint64 time)
+{
+  GList *iter;
+  guint64 timescale;
+
+  timescale = gst_mss_stream_get_timescale (stream);
+  time = gst_util_uint64_scale_round (time, timescale, GST_SECOND);
+
+  for (iter = stream->fragments; iter; iter = g_list_next (iter)) {
+    GList *next = g_list_next (iter);
+    if (next) {
+      GstMssStreamFragment *fragment = next->data;
+
+      if (fragment->time > time) {
+        stream->current_fragment = iter;
+        break;
+      }
+    } else {
+      GstMssStreamFragment *fragment = iter->data;
+      if (fragment->time + fragment->duration > time) {
+        stream->current_fragment = iter;
+      } else {
+        stream->current_fragment = NULL;        /* EOS */
+      }
+      break;
+    }
+  }
+
+  return TRUE;
+}
index 280bdaeebf956bf5feabeca5c638b237578789d5..b75b60e1aec9e9448b7b3777b60d8da3df336a62 100644 (file)
@@ -43,12 +43,15 @@ void gst_mss_manifest_free (GstMssManifest * manifest);
 GSList * gst_mss_manifest_get_streams (GstMssManifest * manifest);
 guint64 gst_mss_manifest_get_timescale (GstMssManifest * manifest);
 guint64 gst_mss_manifest_get_duration (GstMssManifest * manifest);
+GstClockTime gst_mss_manifest_get_gst_duration (GstMssManifest * manifest);
+gboolean gst_mss_manifest_seek (GstMssManifest * manifest, guint64 time);
 
 GstMssStreamType gst_mss_stream_get_type (GstMssStream *stream);
 GstCaps * gst_mss_stream_get_caps (GstMssStream * stream);
 guint64 gst_mss_stream_get_timescale (GstMssStream * stream);
 GstFlowReturn gst_mss_stream_get_fragment_url (GstMssStream * stream, gchar ** url);
 GstFlowReturn gst_mss_stream_advance_fragment (GstMssStream * stream);
+gboolean gst_mss_stream_seek (GstMssStream * stream, guint64 time);
 
 const gchar * gst_mss_stream_type_name (GstMssStreamType streamtype);