pad: Fix race condition causing the same probe to be called multiple times
authorSebastian Dröge <sebastian@centricular.com>
Wed, 9 May 2018 21:05:51 +0000 (00:05 +0300)
committerSebastian Dröge <sebastian@centricular.com>
Fri, 11 May 2018 15:54:55 +0000 (18:54 +0300)
Probes were remembering a cookie that was used to check if the probe was
already called this time before the probes list changed. However the
same probes could've been called by another thread in between and thus
gotten a new cookie, and would then be called a second time.

https://bugzilla.gnome.org/show_bug.cgi?id=795987

gst/gstpad.c

index 02885acc14bebc853ecbdef4314b4f31b811c9a5..1297afcfac111b0c25abbaefc3f4362b009c859e 100644 (file)
@@ -143,7 +143,6 @@ struct _GstPadPrivate
 
   gint using;
   guint probe_list_cookie;
-  guint probe_cookie;
 
   /* counter of how many idle probes are running directly from the add_probe
    * call. Used to block any data flowing in the pad while the idle callback
@@ -159,10 +158,8 @@ struct _GstPadPrivate
 typedef struct
 {
   GHook hook;
-  guint cookie;
 } GstProbe;
 
-#define PROBE_COOKIE(h) (((GstProbe *)(h))->cookie)
 #define GST_PAD_IS_RUNNING_IDLE_PROBE(p) \
     (((GstPad *)(p))->priv->idle_running > 0)
 
@@ -174,7 +171,11 @@ typedef struct
   gboolean pass;
   gboolean handled;
   gboolean marshalled;
-  guint cookie;
+
+  GHook **called_probes;
+  guint n_called_probes;
+  guint called_probes_size;
+  gboolean retry;
 } ProbeMarshall;
 
 static void gst_pad_dispose (GObject * object);
@@ -1456,7 +1457,6 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
   hook->func = callback;
   hook->data = user_data;
   hook->destroy = destroy_data;
-  PROBE_COOKIE (hook) = (pad->priv->probe_cookie - 1);
 
   /* add the probe */
   g_hook_append (&pad->probes, hook);
@@ -3460,6 +3460,8 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query)
   return ret;
 }
 
+#define N_STACK_ALLOCATE_PROBES (16)
+
 static void
 probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 {
@@ -3469,16 +3471,36 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
   GstPadProbeCallback callback;
   GstPadProbeReturn ret;
   gpointer original_data;
+  guint i;
 
-  /* if we have called this callback, do nothing */
-  if (PROBE_COOKIE (hook) == data->cookie) {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "hook %lu, cookie %u already called", hook->hook_id,
-        PROBE_COOKIE (hook));
-    return;
+  /* if we have called this callback, do nothing. But only check
+   * if we're actually calling probes a second time */
+  if (data->retry) {
+    for (i = 0; i < data->n_called_probes; i++) {
+      if (data->called_probes[i] == hook) {
+        GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+            "hook %lu already called", hook->hook_id);
+        return;
+      }
+    }
   }
 
-  PROBE_COOKIE (hook) = data->cookie;
+  /* reallocate on the heap if we had more than 16 probes */
+  if (data->n_called_probes == data->called_probes_size) {
+    if (data->called_probes_size > N_STACK_ALLOCATE_PROBES) {
+      data->called_probes_size *= 2;
+      data->called_probes =
+          g_renew (GHook *, data->called_probes, data->called_probes_size);
+    } else {
+      GHook **tmp = data->called_probes;
+
+      data->called_probes_size *= 2;
+      data->called_probes = g_new (GHook *, data->called_probes_size);
+      memcpy (data->called_probes, tmp,
+          N_STACK_ALLOCATE_PROBES * sizeof (GHook *));
+    }
+  }
+  data->called_probes[data->n_called_probes++] = hook;
 
   flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT;
   type = info->type;
@@ -3526,8 +3548,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
     goto no_match;
 
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "hook %lu, cookie %u with flags 0x%08x matches", hook->hook_id,
-      PROBE_COOKIE (hook), flags);
+      "hook %lu with flags 0x%08x matches", hook->hook_id, flags);
 
   data->marshalled = TRUE;
 
@@ -3583,8 +3604,8 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
 no_match:
   {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "hook %lu, cookie %u with flags 0x%08x does not match %08x",
-        hook->hook_id, PROBE_COOKIE (hook), flags, info->type);
+        "hook %lu with flags 0x%08x does not match %08x",
+        hook->hook_id, flags, info->type);
     return;
   }
 }
@@ -3669,6 +3690,7 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   ProbeMarshall data;
   guint cookie;
   gboolean is_block;
+  GHook *called_probes[N_STACK_ALLOCATE_PROBES];
 
   data.pad = pad;
   data.info = info;
@@ -3676,7 +3698,14 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   data.handled = FALSE;
   data.marshalled = FALSE;
   data.dropped = FALSE;
-  data.cookie = ++pad->priv->probe_cookie;
+
+  /* We stack-allocate for N_STACK_ALLOCATE_PROBES hooks as a first step. If more are needed,
+   * we will re-allocate with g_malloc(). This should usually never be needed
+   */
+  data.called_probes = called_probes;
+  data.n_called_probes = 0;
+  data.called_probes_size = N_STACK_ALLOCATE_PROBES;
+  data.retry = FALSE;
 
   is_block =
       (info->type & GST_PAD_PROBE_TYPE_BLOCK) == GST_PAD_PROBE_TYPE_BLOCK;
@@ -3687,18 +3716,18 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   }
 
 again:
-  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "do probes cookie %u", data.cookie);
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "do probes");
   cookie = pad->priv->probe_list_cookie;
 
   g_hook_list_marshal (&pad->probes, TRUE,
       (GHookMarshaller) probe_hook_marshal, &data);
 
-  /* if the list changed, call the new callbacks (they will not have their
-   * cookie set to data.cookie */
+  /* if the list changed, call the new callbacks (they will not be in
+   * called_probes yet) */
   if (cookie != pad->priv->probe_list_cookie) {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
         "probe list changed, restarting");
+    data.retry = TRUE;
     goto again;
   }
 
@@ -3740,11 +3769,12 @@ again:
       GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING);
       GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked");
 
-      /* if the list changed, call the new callbacks (they will not have their
-       * cookie set to data.cookie */
+      /* if the list changed, call the new callbacks (they will not be in
+       * called_probes yet) */
       if (cookie != pad->priv->probe_list_cookie) {
         GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
             "probe list changed, restarting");
+        data.retry = TRUE;
         goto again;
       }
 
@@ -3753,28 +3783,39 @@ again:
     }
   }
 
+  if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+    g_free (data.called_probes);
+
   return defaultval;
 
   /* ERRORS */
 flushing:
   {
     GST_DEBUG_OBJECT (pad, "pad is flushing");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_FLUSHING;
   }
 dropped:
   {
     GST_DEBUG_OBJECT (pad, "data is dropped");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_CUSTOM_SUCCESS;
   }
 passed:
   {
     /* FIXME : Should we return FLOW_OK or the defaultval ?? */
     GST_DEBUG_OBJECT (pad, "data is passed");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_OK;
   }
 handled:
   {
     GST_DEBUG_OBJECT (pad, "data was handled");
+    if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
+      g_free (data.called_probes);
     return GST_FLOW_CUSTOM_SUCCESS_1;
   }
 }