oggdemux: do not send seek events from the streaming thread
authorVincent Penquerc'h <vincent.penquerch@collabora.co.uk>
Thu, 5 Mar 2015 17:42:53 +0000 (17:42 +0000)
committerVincent Penquerc'h <vincent.penquerch@collabora.co.uk>
Wed, 11 Mar 2015 14:40:49 +0000 (14:40 +0000)
This will usually deadlock, despite this patch being in master for
quite some time and working fine. Nevertheless, we deem it to be
not working, disregarding facts.

As such, we fix it by keeping track of seek events, and sending
them upstream from a separate thread. Buffers are then discarded
till we get a new segment with the expected seqnum.

ext/ogg/gstoggdemux.c
ext/ogg/gstoggdemux.h

index e1b913203fa3d2423a3fdba4bb1b56875b779382..88d8d14ba0a353a6a20af7533dc2327b7044e90b 100644 (file)
@@ -1449,22 +1449,19 @@ gst_ogg_demux_seek_back_after_push_duration_check_unlock (GstOggDemux * ogg)
 
   ogg->push_state = PUSH_PLAYING;
 
-  GST_PUSH_UNLOCK (ogg);
-
-  if (event) {
-    /* If there is one, perform it */
-    gst_ogg_demux_perform_seek_push (ogg, event);
-  } else {
-    /* If there wasn't, seek back at start to start normal playback */
+  /* If there is one, perform it. Otherwise, seek back at start to start
+   * normal playback  */
+  if (!event) {
     GST_INFO_OBJECT (ogg, "Seeking back to 0 after duration check");
     event = gst_event_new_seek (1.0, GST_FORMAT_BYTES,
         GST_SEEK_FLAG_ACCURATE | GST_SEEK_FLAG_FLUSH,
         GST_SEEK_TYPE_SET, 1, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE);
-    if (!gst_pad_push_event (ogg->sinkpad, event)) {
-      GST_WARNING_OBJECT (ogg, "Failed seeking back to start");
-      return GST_FLOW_ERROR;
-    }
   }
+  gst_event_replace (&ogg->seek_event, event);
+  GST_PUSH_UNLOCK (ogg);
+  g_mutex_lock (&ogg->seek_event_mutex);
+  g_cond_broadcast (&ogg->seek_event_cond);
+  g_mutex_unlock (&ogg->seek_event_mutex);
 
   return GST_FLOW_OK;
 }
@@ -1594,7 +1591,6 @@ gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page)
     GstClockTime t;
     gint64 best = -1;
     GstEvent *sevent;
-    int res;
     gboolean close_enough;
     float seek_quality;
 
@@ -1791,13 +1787,11 @@ gst_ogg_pad_handle_push_mode_state (GstOggPad * pad, ogg_page * page)
           GST_SEEK_TYPE_NONE, -1);
       gst_event_set_seqnum (sevent, ogg->seqnum);
 
+      gst_event_replace (&ogg->seek_event, sevent);
       GST_PUSH_UNLOCK (ogg);
-      res = gst_pad_push_event (ogg->sinkpad, sevent);
-      if (!res) {
-        /* We failed to send the seek event, notify the pipeline */
-        GST_ELEMENT_ERROR (ogg, RESOURCE, SEEK, (NULL), ("Failed to seek"));
-        return GST_FLOW_ERROR;
-      }
+      g_mutex_lock (&ogg->seek_event_mutex);
+      g_cond_broadcast (&ogg->seek_event_cond);
+      g_mutex_unlock (&ogg->seek_event_mutex);
       return GST_FLOW_SKIP_PUSH;
     }
 
@@ -2313,12 +2307,21 @@ gst_ogg_demux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
               gst_ogg_demux_reset_streams (ogg);
             }
           }
+
+          if (!ogg->pullmode) {
+            if (ogg->seek_event_drop_till == gst_event_get_seqnum (event)) {
+              GST_DEBUG_OBJECT (ogg, "Got event seqnum %u, stopping dropping",
+                  ogg->seek_event_drop_till);
+              ogg->seek_event_drop_till = 0;
+            }
+          }
           GST_PUSH_UNLOCK (ogg);
         } else {
           GST_WARNING_OBJECT (ogg, "unexpected segment format: %s",
               gst_format_get_name (segment.format));
         }
       }
+
       gst_event_unref (event);
       res = TRUE;
       break;
@@ -3536,7 +3539,6 @@ gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags)
   /* In push mode, we get to the end of the stream to get the duration */
   gint64 position;
   GstEvent *sevent;
-  gboolean res;
 
   /* A full Ogg page can be almost 64 KB. There's no guarantee that there'll be a
      granpos there, but it's fairly likely */
@@ -3551,16 +3553,11 @@ gst_ogg_demux_get_duration_push (GstOggDemux * ogg, int flags)
   /* do not read the last byte */
   sevent = gst_event_new_seek (1.0, GST_FORMAT_BYTES, flags, GST_SEEK_TYPE_SET,
       position, GST_SEEK_TYPE_SET, ogg->push_byte_length - 1);
-  res = gst_pad_push_event (ogg->sinkpad, sevent);
-  if (res) {
-    GST_DEBUG_OBJECT (ogg, "Seek succesful");
-    return TRUE;
-  } else {
-    GST_INFO_OBJECT (ogg, "Seek failed, duration will stay unknown");
-    ogg->push_state = PUSH_PLAYING;
-    ogg->push_disable_seeking = TRUE;
-    return FALSE;
-  }
+  gst_event_replace (&ogg->seek_event, sevent);
+  g_mutex_lock (&ogg->seek_event_mutex);
+  g_cond_broadcast (&ogg->seek_event_cond);
+  g_mutex_unlock (&ogg->seek_event_mutex);
+  return TRUE;
 }
 
 static gboolean
@@ -3766,8 +3763,11 @@ gst_ogg_demux_perform_seek_push (GstOggDemux * ogg, GstEvent * event)
       start_type, best, GST_SEEK_TYPE_NONE, -1);
   gst_event_set_seqnum (sevent, gst_event_get_seqnum (event));
 
+  gst_event_replace (&ogg->seek_event, sevent);
   GST_PUSH_UNLOCK (ogg);
-  res = gst_pad_push_event (ogg->sinkpad, sevent);
+  g_mutex_lock (&ogg->seek_event_mutex);
+  g_cond_broadcast (&ogg->seek_event_cond);
+  g_mutex_unlock (&ogg->seek_event_mutex);
 
   return res;
 
@@ -4434,9 +4434,19 @@ gst_ogg_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
   GstOggDemux *ogg;
   gint ret = 0;
   GstFlowReturn result = GST_FLOW_OK;
+  gboolean drop;
 
   ogg = GST_OGG_DEMUX (parent);
 
+  GST_PUSH_LOCK (ogg);
+  drop = (ogg->seek_event_drop_till > 0);
+  GST_PUSH_UNLOCK (ogg);
+  if (drop) {
+    GST_ERROR_OBJECT (ogg, "Dropping buffer because we have a pending seek");
+    gst_buffer_unref (buffer);
+    return GST_FLOW_OK;
+  }
+
   GST_DEBUG_OBJECT (ogg, "enter");
   result = gst_ogg_demux_submit_buffer (ogg, buffer);
   if (result < 0) {
@@ -4743,6 +4753,55 @@ pause:
   }
 }
 
+/* The sink pad task function for push mode.
+ * It just sends any seek events queued by the streaming thread.
+ */
+static gpointer
+gst_ogg_demux_loop_push (GstOggDemux * ogg)
+{
+  GstEvent *event;
+
+  while (1) {
+    g_mutex_lock (&ogg->seek_event_mutex);
+    if (ogg->seek_event_thread_stop) {
+      g_mutex_unlock (&ogg->seek_event_mutex);
+      break;
+    }
+    g_cond_wait (&ogg->seek_event_cond, &ogg->seek_event_mutex);
+    if (ogg->seek_event_thread_stop) {
+      g_mutex_unlock (&ogg->seek_event_mutex);
+      break;
+    }
+    g_mutex_unlock (&ogg->seek_event_mutex);
+
+    GST_PUSH_LOCK (ogg);
+    event = ogg->seek_event;
+    ogg->seek_event = NULL;
+    if (event) {
+      ogg->seek_event_drop_till = gst_event_get_seqnum (event);
+    }
+    GST_PUSH_UNLOCK (ogg);
+
+    if (!event)
+      continue;
+
+    GST_ERROR ("Pushing event %" GST_PTR_FORMAT, event);
+    if (!gst_pad_push_event (ogg->sinkpad, event)) {
+      GST_WARNING_OBJECT (ogg, "Failed to push event");
+      GST_PUSH_LOCK (ogg);
+      if (!ogg->pullmode) {
+        ogg->push_state = PUSH_PLAYING;
+        ogg->push_disable_seeking = TRUE;
+      }
+      GST_PUSH_UNLOCK (ogg);
+    } else {
+      GST_ERROR ("Pushed event ok");
+    }
+  }
+  gst_object_unref (ogg);
+  return NULL;
+}
+
 static void
 gst_ogg_demux_clear_chains (GstOggDemux * ogg)
 {
@@ -4815,6 +4874,22 @@ gst_ogg_demux_sink_activate_mode (GstPad * sinkpad, GstObject * parent,
     case GST_PAD_MODE_PUSH:
       ogg->pullmode = FALSE;
       ogg->resync = FALSE;
+      if (active) {
+        ogg->seek_event_thread_stop = FALSE;
+        g_mutex_init (&ogg->seek_event_mutex);
+        g_cond_init (&ogg->seek_event_cond);
+        ogg->seek_event_thread = g_thread_new ("seek_event_thread",
+            (GThreadFunc) gst_ogg_demux_loop_push, gst_object_ref (ogg));
+      } else {
+        g_mutex_lock (&ogg->seek_event_mutex);
+        ogg->seek_event_thread_stop = TRUE;
+        g_cond_broadcast (&ogg->seek_event_cond);
+        g_mutex_unlock (&ogg->seek_event_mutex);
+        g_thread_join (ogg->seek_event_thread);
+        g_cond_clear (&ogg->seek_event_cond);
+        g_mutex_clear (&ogg->seek_event_mutex);
+        ogg->seek_event_thread = NULL;
+      }
       res = TRUE;
       break;
     case GST_PAD_MODE_PULL:
index bb7948a606ff5fa6261614cf2366873bf6b20d82..742392f143699499a6462c70e7e9d627e280b48b 100644 (file)
@@ -200,6 +200,14 @@ struct _GstOggDemux
   /* ogg stuff */
   ogg_sync_state sync;
   long chunk_size;
+
+  /* Seek events set up by the streaming thread in push mode */
+  GstEvent *seek_event;
+  GThread *seek_event_thread;
+  GMutex seek_event_mutex;
+  GCond seek_event_cond;
+  gboolean seek_event_thread_stop;
+  guint32 seek_event_drop_till;
 };
 
 struct _GstOggDemuxClass