oggdemux: Move seeking in pull mode to the streaming thread
authorThibault Saunier <tsaunier@igalia.com>
Thu, 29 Aug 2019 15:16:39 +0000 (11:16 -0400)
committerThibault Saunier <tsaunier@gnome.org>
Fri, 6 Sep 2019 15:06:53 +0000 (15:06 +0000)
Flushing and teering down the streaming thread from the seeking thread
and simply letting the streaming thread handle the seek event in its
loop function.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/639

ext/ogg/gstoggdemux.c
ext/ogg/gstoggdemux.h

index d09ab8b..e88e325 100644 (file)
@@ -3516,7 +3516,7 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
 {
   GstOggChain *chain = NULL;
   gboolean res;
-  gboolean flush, accurate, keyframe;
+  gboolean accurate, keyframe;
   GstFormat format;
   gdouble rate;
   GstSeekFlags flags;
@@ -3524,7 +3524,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
   gint64 start, stop;
   gboolean update;
   guint32 seqnum;
-  GstEvent *tevent;
 
   if (event) {
     GST_DEBUG_OBJECT (ogg, "seek with event");
@@ -3548,41 +3547,10 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
 
   GST_DEBUG_OBJECT (ogg, "seek, rate %g", rate);
 
-  flush = flags & GST_SEEK_FLAG_FLUSH;
   accurate = flags & GST_SEEK_FLAG_ACCURATE;
   keyframe = flags & GST_SEEK_FLAG_KEY_UNIT;
 
-  /* first step is to unlock the streaming thread if it is
-   * blocked in a chain call, we do this by starting the flush. because
-   * we cannot yet hold any streaming lock, we have to protect the chains
-   * with their own lock. */
-  if (flush) {
-    gint i;
-
-    tevent = gst_event_new_flush_start ();
-    gst_event_set_seqnum (tevent, seqnum);
-
-    gst_event_ref (tevent);
-    gst_pad_push_event (ogg->sinkpad, tevent);
-
-    GST_CHAIN_LOCK (ogg);
-    for (i = 0; i < ogg->chains->len; i++) {
-      GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
-      gint j;
-
-      for (j = 0; j < chain->streams->len; j++) {
-        GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, j);
-
-        gst_event_ref (tevent);
-        gst_pad_push_event (GST_PAD (pad), tevent);
-      }
-    }
-    GST_CHAIN_UNLOCK (ogg);
-
-    gst_event_unref (tevent);
-  } else {
-    gst_pad_pause_task (ogg->sinkpad);
-  }
+  gst_pad_pause_task (ogg->sinkpad);
 
   /* now grab the stream lock so that streaming cannot continue, for
    * non flushing seeks when the element is in PAUSED this could block
@@ -3598,14 +3566,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
       GST_TIME_FORMAT, GST_TIME_ARGS (ogg->segment.start),
       GST_TIME_ARGS (ogg->segment.stop));
 
-  /* we need to stop flushing on the srcpad as we're going to use it
-   * next. We can do this as we have the STREAM lock now. */
-  if (flush) {
-    tevent = gst_event_new_flush_stop (TRUE);
-    gst_event_set_seqnum (tevent, seqnum);
-    gst_pad_push_event (ogg->sinkpad, tevent);
-  }
-
   {
     gint i;
 
@@ -3640,13 +3600,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
     gint64 position, begin_time;
     GstSegment segment;
 
-    /* we have to send the flush to the old chain, not the new one */
-    if (flush) {
-      tevent = gst_event_new_flush_stop (TRUE);
-      gst_event_set_seqnum (tevent, seqnum);
-      gst_ogg_demux_send_event (ogg, tevent);
-    }
-
     /* we need this to see how far inside the chain we need to start */
     if (chain->begin_time != GST_CLOCK_TIME_NONE)
       begin_time = chain->begin_time;
@@ -3725,19 +3678,24 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
   /* streaming can continue now */
   GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
 
+done:
+  if (event)
+    gst_event_unref (event);
   return res;
 
   /* ERRORS */
 error:
   {
     GST_DEBUG_OBJECT (ogg, "seek failed");
-    return FALSE;
+    res = FALSE;
+    goto done;
   }
 no_chain:
   {
     GST_DEBUG_OBJECT (ogg, "no chain to seek in");
     GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
-    return FALSE;
+    res = FALSE;
+    goto done;
   }
 }
 
@@ -3993,12 +3951,79 @@ error_locked:
 }
 
 static gboolean
+gst_ogg_demux_setup_seek_pull (GstOggDemux * ogg, GstEvent * event)
+{
+  gboolean flush;
+  GstSeekFlags flags;
+  GstEvent *tevent;
+  guint32 seqnum = gst_event_get_seqnum (event);
+
+  GST_DEBUG_OBJECT (ogg, "Scheduling seek: %" GST_PTR_FORMAT, event);
+  gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
+
+  flush = flags & GST_SEEK_FLAG_FLUSH;
+
+  /* first step is to unlock the streaming thread if it is
+   * blocked in a chain call, we do this by starting the flush. because
+   * we cannot yet hold any streaming lock, we have to protect the chains
+   * with their own lock. */
+  if (flush) {
+    gint i;
+
+    tevent = gst_event_new_flush_start ();
+    gst_event_set_seqnum (tevent, seqnum);
+
+    gst_event_ref (tevent);
+    gst_pad_push_event (ogg->sinkpad, tevent);
+
+    GST_CHAIN_LOCK (ogg);
+    for (i = 0; i < ogg->chains->len; i++) {
+      GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
+      gint j;
+
+      for (j = 0; j < chain->streams->len; j++) {
+        GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, j);
+
+        gst_event_ref (tevent);
+        gst_pad_push_event (GST_PAD (pad), tevent);
+      }
+    }
+    GST_CHAIN_UNLOCK (ogg);
+
+    gst_event_unref (tevent);
+  }
+
+  gst_pad_pause_task (ogg->sinkpad);
+
+  /* now grab the stream lock so that streaming cannot continue, for
+   * non flushing seeks when the element is in PAUSED this could block
+   * forever. */
+  GST_PAD_STREAM_LOCK (ogg->sinkpad);
+
+  /* we need to stop flushing on the sinkpad as we're going to use it
+   * next. We can do this as we have the STREAM lock now. */
+  if (flush) {
+    tevent = gst_event_new_flush_stop (TRUE);
+    gst_event_set_seqnum (tevent, seqnum);
+    gst_pad_push_event (ogg->sinkpad, gst_event_ref (tevent));
+    gst_ogg_demux_send_event (ogg, tevent);
+  }
+
+  gst_event_replace (&ogg->seek_event, event);
+  gst_pad_start_task (ogg->sinkpad, (GstTaskFunction) gst_ogg_demux_loop,
+      ogg->sinkpad, NULL);
+  GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
+
+  return TRUE;
+}
+
+static gboolean
 gst_ogg_demux_perform_seek (GstOggDemux * ogg, GstEvent * event)
 {
   gboolean res;
 
   if (ogg->pullmode) {
-    res = gst_ogg_demux_perform_seek_pull (ogg, event);
+    res = gst_ogg_demux_setup_seek_pull (ogg, event);
   } else {
     res = gst_ogg_demux_perform_seek_push (ogg, event);
   }
@@ -4862,12 +4887,15 @@ static void
 gst_ogg_demux_loop (GstOggPad * pad)
 {
   GstOggDemux *ogg;
+  gboolean res;
   GstFlowReturn ret;
+  GstEvent *seek;
 
   ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (pad));
+  seek = ogg->seek_event;
+  ogg->seek_event = NULL;
 
   if (ogg->need_chains) {
-    gboolean res;
 
     /* this is the only place where we write chains and thus need to lock. */
     GST_CHAIN_LOCK (ogg);
@@ -4883,10 +4911,14 @@ gst_ogg_demux_loop (GstOggPad * pad)
     GST_OBJECT_UNLOCK (ogg);
 
     /* and seek to configured positions without FLUSH */
-    res = gst_ogg_demux_perform_seek_pull (ogg, NULL);
+    res = gst_ogg_demux_perform_seek_pull (ogg, seek);
 
     if (!res)
       goto seek_failed;
+  } else if (seek) {
+    res = gst_ogg_demux_perform_seek_pull (ogg, seek);
+    if (!res)
+      goto seek_failed;
   }
 
   if (ogg->segment.rate >= 0.0)
index 3fe7fff..5ac81b7 100644 (file)
@@ -203,7 +203,7 @@ struct _GstOggDemux
   ogg_sync_state sync;
   long chunk_size;
 
-  /* Seek events set up by the streaming thread in push mode */
+  /* Seek events set up by the streaming thread */
   GstEvent *seek_event;
   GThread *seek_event_thread;
   GMutex seek_event_mutex;