nlesource: Wait for the seek to actualy happen before removing the probe
authorThibault Saunier <tsaunier@igalia.com>
Sun, 23 Jun 2019 03:49:50 +0000 (23:49 -0400)
committerThibault Saunier <tsaunier@igalia.com>
Fri, 5 Jul 2019 22:30:41 +0000 (18:30 -0400)
Make sure that an event resulting from the seek happens before removing
the pad probe, dropping anything while it is not the case.

This guarantees that the seek happens before `nlesource` outputs
anything. This was not necessary as with decodebin or usual source
flushing seeks lead to synchronous flush_start/flush_stop and we could
safely assume that once the seek is sent, it was happenning.
With nested `nlecomposition` this assumption is simply not true as
in the composition seeks are basically cached and happen later in
the composition updating thread.

This fixes races where we ended up removing the blocking probe before
the seek actually started to be executed in the nlecomposition
nested inside an nlesource which leaded to data from *before* the seek
to be outputed which means we could display wrong frames,
and it was leading to interesting deadlocks.

plugins/nle/nlesource.c

index 439dc38..36ad277 100644 (file)
@@ -60,6 +60,7 @@ struct _NleSourcePrivate
 
   GMutex seek_lock;
   GstEvent *seek_event;
+  guint32 flush_seqnum;
   gulong probeid;
 };
 
@@ -116,6 +117,24 @@ nle_source_class_init (NleSourceClass * klass)
 
 }
 
+static GstPadProbeReturn
+srcpad_probe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source)
+{
+  GstEvent *event = info->data;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:
+      GST_OBJECT_LOCK (source);
+      source->priv->flush_seqnum = GST_EVENT_SEQNUM (event);
+      GST_DEBUG_OBJECT (pad, "Seek seqnum: %d", source->priv->flush_seqnum);
+      GST_OBJECT_UNLOCK (source);
+      break;
+    default:
+      break;
+  }
+
+  return GST_PAD_PROBE_OK;
+}
 
 static void
 nle_source_init (NleSource * source)
@@ -125,6 +144,10 @@ nle_source_init (NleSource * source)
   source->priv = nle_source_get_instance_private (source);
   g_mutex_init (&source->priv->seek_lock);
 
+  gst_pad_add_probe (NLE_OBJECT_SRC (source),
+      GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, (GstPadProbeCallback) srcpad_probe_cb,
+      source, NULL);
+
   GST_DEBUG_OBJECT (source, "Setting GstBin async-handling to TRUE");
   g_object_set (G_OBJECT (source), "async-handling", TRUE, NULL);
 }
@@ -441,34 +464,49 @@ ghost_seek_pad (GstElement * source, gpointer user_data)
     GstEvent *seek_event = priv->seek_event;
     priv->seek_event = NULL;
 
+    GST_INFO_OBJECT (source, "Sending seek: %" GST_PTR_FORMAT, seek_event);
+    GST_OBJECT_LOCK (source);
+    priv->flush_seqnum = GST_EVENT_SEQNUM (seek_event);
+    GST_OBJECT_UNLOCK (source);
     if (!(gst_pad_send_event (priv->ghostedpad, seek_event)))
       GST_ELEMENT_ERROR (source, RESOURCE, SEEK,
           (NULL), ("Sending initial seek to upstream element failed"));
   }
   g_mutex_unlock (&priv->seek_lock);
-
-  GST_OBJECT_LOCK (source);
-  if (priv->probeid) {
-    GST_DEBUG_OBJECT (source, "Removing blocking probe! %lu", priv->probeid);
-    priv->areblocked = FALSE;
-    gst_pad_remove_probe (priv->ghostedpad, priv->probeid);
-    priv->probeid = 0;
-  }
-  GST_OBJECT_UNLOCK (source);
 }
 
 static GstPadProbeReturn
-pad_blocked_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source)
+pad_brobe_cb (GstPad * pad, GstPadProbeInfo * info, NleSource * source)
 {
+  NleSourcePrivate *priv = source->priv;
+  GstPadProbeReturn res = GST_PAD_PROBE_OK;
+
   GST_OBJECT_LOCK (source);
-  if (!source->priv->areblocked) {
+  if (!priv->areblocked) {
     GST_INFO_OBJECT (pad, "Blocked now, launching seek");
+    priv->areblocked = TRUE;
     gst_element_call_async (GST_ELEMENT (source), ghost_seek_pad, NULL, NULL);
-    source->priv->areblocked = TRUE;
+    GST_OBJECT_UNLOCK (source);
+
+    return GST_PAD_PROBE_OK;
+  }
+
+  if (priv->probeid && GST_EVENT_SEQNUM (info->data) == priv->flush_seqnum) {
+    GST_INFO_OBJECT (source, "Seeking now done: %" GST_PTR_FORMAT
+        " - %d ? %d", info->data, GST_EVENT_SEQNUM (info->data),
+        priv->flush_seqnum);
+    priv->flush_seqnum = GST_SEQNUM_INVALID;
+    priv->areblocked = FALSE;
+    priv->probeid = 0;
+    res = GST_PAD_PROBE_REMOVE;
+  } else {
+    res = GST_PAD_PROBE_DROP;
+    GST_DEBUG_OBJECT (source, "Dropping %" GST_PTR_FORMAT " - %d ? %d",
+        info->data, GST_EVENT_SEQNUM (info->data), priv->flush_seqnum);
   }
   GST_OBJECT_UNLOCK (source);
 
-  return GST_PAD_PROBE_OK;
+  return res;
 }
 
 static gboolean
@@ -508,8 +546,9 @@ nle_source_prepare (NleObject * object)
     priv->ghostedpad = pad;
     GST_OBJECT_LOCK (source);
     priv->probeid = gst_pad_add_probe (pad,
-        GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
-        (GstPadProbeCallback) pad_blocked_cb, source, NULL);
+        GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
+        GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, (GstPadProbeCallback) pad_brobe_cb,
+        source, NULL);
     GST_OBJECT_UNLOCK (source);
     gst_object_unref (pad);
   }