buffer: handle gst_buffer_map failures
[platform/upstream/gstreamer.git] / libs / gst / check / gstharness.c
index b878b2b..7a6d2aa 100644 (file)
@@ -20,6 +20,7 @@
 
 /**
  * SECTION:gstharness
+ * @title: GstHarness
  * @short_description: A test-harness for writing GStreamer unit tests
  * @see_also: #GstTestClock,\
  *
@@ -30,7 +31,7 @@
  * The basic structure of #GstHarness is two "floating" #GstPads that connect
  * to the harnessed #GstElement src and sink #GstPads like so:
  *
- * <programlisting>
+ * |[
  *           __________________________
  *  _____   |  _____            _____  |   _____
  * |     |  | |     |          |     | |  |     |
@@ -38,7 +39,7 @@
  * |_____|  | |_____|          |_____| |  |_____|
  *          |__________________________|
  *
- * </programlisting>
+ * ]|
  *
  * With this, you can now simulate any environment the #GstElement might find
  * itself in. By specifying the #GstCaps of the harness #GstPads, using
  * then pull them out to examine them with gst_harness_pull() and
  * gst_harness_pull_event().
  *
- * <example>
- * <title>A simple buffer-in buffer-out example</title>
- *   <programlisting language="c">
- *   #include &lt;gst/gst.h&gt;
- *   #include &lt;gst/check/gstharness.h&gt;
+ * ## A simple buffer-in buffer-out example
+ *
+ * |[<!-- language="C" -->
+ *   #include <gst/gst.h>
+ *   #include <gst/check/gstharness.h>
  *   GstHarness *h;
  *   GstBuffer *in_buf;
  *   GstBuffer *out_buf;
@@ -85,8 +86,7 @@
  *   gst_buffer_unref (out_buf);
  *   gst_harness_teardown (h);
  *
- *   </programlisting>
- * </example>
+ *   ]|
  *
  * Another main feature of the #GstHarness is its integration with the
  * #GstTestClock. Operating the #GstTestClock can be very challenging, but
  * src-element (videotestsrc) and an encoder (vp8enc) to feed the decoder data
  * with different configurations, by simply doing:
  *
- * <example>
- * <programlisting language="c">
+ * |[<!-- language="C" -->
  *   GstHarness * h = gst_harness_new (h, "vp8dec");
  *   gst_harness_add_src_parse (h, "videotestsrc is-live=1 ! vp8enc", TRUE);
- * </programlisting>
- * </example>
+ * ]|
  *
  * and then feeding it data with:
  *
- * <example>
- * <programlisting language="c">
+ * |[<!-- language="C" -->
  * gst_harness_push_from_src (h);
- * </programlisting>
- * </example>
+ * ]|
  *
  */
 #ifdef HAVE_CONFIG_H
@@ -139,6 +135,8 @@ static void gst_harness_stress_free (GstHarnessThread * t);
 
 #define HARNESS_KEY "harness"
 #define HARNESS_REF "harness-ref"
+#define HARNESS_LOCK(h) g_mutex_lock (&(h)->priv->priv_mutex)
+#define HARNESS_UNLOCK(h) g_mutex_unlock (&(h)->priv->priv_mutex)
 
 static GstStaticPadTemplate hsrctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
@@ -159,6 +157,7 @@ struct _GstHarnessPrivate
 
   gboolean forwarding;
   GstPad *sink_forward_pad;
+  GstTestClock *testclock;
 
   volatile gint recv_buffers;
   volatile gint recv_events;
@@ -183,6 +182,7 @@ struct _GstHarnessPrivate
   gboolean blocking_push_mode;
   GCond blocking_push_cond;
   GMutex blocking_push_mutex;
+  GMutex priv_mutex;
 
   GPtrArray *stress;
 };
@@ -227,6 +227,7 @@ gst_harness_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
   GstHarness *h = g_object_get_data (G_OBJECT (pad), HARNESS_KEY);
   GstHarnessPrivate *priv = h->priv;
+  gboolean ret = TRUE;
   gboolean forward;
 
   g_assert (h != NULL);
@@ -244,13 +245,19 @@ gst_harness_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       break;
   }
 
+  HARNESS_LOCK (h);
   if (priv->forwarding && forward && priv->sink_forward_pad) {
-    gst_pad_push_event (priv->sink_forward_pad, event);
+    GstPad *fwdpad = gst_object_ref (priv->sink_forward_pad);
+    HARNESS_UNLOCK (h);
+    ret = gst_pad_push_event (fwdpad, event);
+    gst_object_unref (fwdpad);
+    HARNESS_LOCK (h);
   } else {
     g_async_queue_push (priv->sink_event_queue, event);
   }
+  HARNESS_UNLOCK (h);
 
-  return TRUE;
+  return ret;
 }
 
 static void
@@ -362,11 +369,14 @@ gst_harness_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
       break;
     case GST_QUERY_ALLOCATION:
     {
+      HARNESS_LOCK (h);
       if (priv->forwarding && priv->sink_forward_pad != NULL) {
         GstPad *peer = gst_pad_get_peer (priv->sink_forward_pad);
         g_assert (peer != NULL);
+        HARNESS_UNLOCK (h);
         res = gst_pad_query (peer, query);
         gst_object_unref (peer);
+        HARNESS_LOCK (h);
       } else {
         GstCaps *caps;
         gboolean need_pool;
@@ -383,6 +393,7 @@ gst_harness_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
         GST_DEBUG_OBJECT (pad, "proposing allocation %" GST_PTR_FORMAT,
             priv->propose_allocator);
       }
+      HARNESS_UNLOCK (h);
       break;
     }
     default:
@@ -603,39 +614,23 @@ gst_pad_is_request_pad (GstPad * pad)
 }
 
 /**
- * gst_harness_new_full: (skip)
- * @element: a #GstElement to attach the harness to (transfer none)
- * @hsrc: (allow-none): a #GstStaticPadTemplate describing the harness srcpad.
- * %NULL will not create a harness srcpad.
- * @element_sinkpad_name: (allow-none): a #gchar with the name of the element
- * sinkpad that is then linked to the harness srcpad. Can be a static or request
- * or a sometimes pad that has been added. %NULL will not get/request a sinkpad
- * from the element. (Like if the element is a src.)
- * @hsink: (allow-none): a #GstStaticPadTemplate describing the harness sinkpad.
- * %NULL will not create a harness sinkpad.
- * @element_srcpad_name: (allow-none): a #gchar with the name of the element
- * srcpad that is then linked to the harness sinkpad, similar to the
- * @element_sinkpad_name.
+ * gst_harness_new_empty: (skip)
  *
- * Creates a new harness.
+ * Creates a new empty harness. Use gst_harness_add_element_full() to add
+ * an #GstElement to it.
  *
  * MT safe.
  *
  * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
  * not be created
  *
- * Since: 1.6
+ * Since: 1.8
  */
 GstHarness *
-gst_harness_new_full (GstElement * element,
-    GstStaticPadTemplate * hsrc, const gchar * element_sinkpad_name,
-    GstStaticPadTemplate * hsink, const gchar * element_srcpad_name)
+gst_harness_new_empty (void)
 {
   GstHarness *h;
   GstHarnessPrivate *priv;
-  gboolean has_sinkpad, has_srcpad;
-
-  g_return_val_if_fail (element != NULL, NULL);
 
   h = g_new0 (GstHarness, 1);
   g_assert (h != NULL);
@@ -643,18 +638,63 @@ gst_harness_new_full (GstElement * element,
   priv = h->priv;
 
   GST_DEBUG_OBJECT (h, "about to create new harness %p", h);
-  h->element = gst_object_ref (element);
   priv->last_push_ts = GST_CLOCK_TIME_NONE;
   priv->latency_min = 0;
   priv->latency_max = GST_CLOCK_TIME_NONE;
   priv->drop_buffers = FALSE;
+  priv->testclock = GST_TEST_CLOCK_CAST (gst_test_clock_new ());
 
   priv->propose_allocator = NULL;
   gst_allocation_params_init (&priv->propose_allocation_params);
 
   g_mutex_init (&priv->blocking_push_mutex);
   g_cond_init (&priv->blocking_push_cond);
+  g_mutex_init (&priv->priv_mutex);
+
+  priv->stress = g_ptr_array_new_with_free_func (
+      (GDestroyNotify) gst_harness_stress_free);
 
+  /* we have forwarding on as a default */
+  gst_harness_set_forwarding (h, TRUE);
+
+  return h;
+}
+
+/**
+ * gst_harness_add_element_full: (skip)
+ * @h: a #GstHarness
+ * @element: a #GstElement to add to the harness (transfer none)
+ * @hsrc: (allow-none): a #GstStaticPadTemplate describing the harness srcpad.
+ * %NULL will not create a harness srcpad.
+ * @element_sinkpad_name: (allow-none): a #gchar with the name of the element
+ * sinkpad that is then linked to the harness srcpad. Can be a static or request
+ * or a sometimes pad that has been added. %NULL will not get/request a sinkpad
+ * from the element. (Like if the element is a src.)
+ * @hsink: (allow-none): a #GstStaticPadTemplate describing the harness sinkpad.
+ * %NULL will not create a harness sinkpad.
+ * @element_srcpad_name: (allow-none): a #gchar with the name of the element
+ * srcpad that is then linked to the harness sinkpad, similar to the
+ * @element_sinkpad_name.
+ *
+ * Adds a #GstElement to an empty #GstHarness
+ *
+ * MT safe.
+ *
+ * Since: 1.6
+ */
+void
+gst_harness_add_element_full (GstHarness * h, GstElement * element,
+    GstStaticPadTemplate * hsrc, const gchar * element_sinkpad_name,
+    GstStaticPadTemplate * hsink, const gchar * element_srcpad_name)
+{
+  GstClock *element_clock;
+  gboolean has_sinkpad, has_srcpad;
+
+  g_return_if_fail (element != NULL);
+  g_return_if_fail (h->element == NULL);
+
+  element_clock = GST_ELEMENT_CLOCK (element);
+  h->element = gst_object_ref (element);
   check_element_type (element, &has_sinkpad, &has_srcpad);
 
   /* setup the loose srcpad linked to the element sinkpad */
@@ -679,23 +719,62 @@ gst_harness_new_full (GstElement * element,
     g_free (stream_id);
   }
 
+  /* if the element already has a testclock attached,
+     we replace our own with it, if no clock we attach the testclock */
+  if (element_clock) {
+    if (GST_IS_TEST_CLOCK (element_clock)) {
+      gst_object_replace ((GstObject **) & h->priv->testclock,
+          (GstObject *) GST_ELEMENT_CLOCK (element));
+    }
+  } else {
+    gst_harness_use_testclock (h);
+  }
+
   /* don't start sources, they start producing data! */
   if (has_sinkpad)
     gst_harness_play (h);
 
   gst_harness_element_ref (h);
 
-  GST_DEBUG_OBJECT (h, "created new harness %p "
+  GST_DEBUG_OBJECT (h, "added element to harness %p "
       "with element_srcpad_name (%p, %s, %s) and element_sinkpad_name (%p, %s, %s)",
       h, h->srcpad, GST_DEBUG_PAD_NAME (h->srcpad),
       h->sinkpad, GST_DEBUG_PAD_NAME (h->sinkpad));
+}
 
-  priv->stress = g_ptr_array_new_with_free_func (
-      (GDestroyNotify) gst_harness_stress_free);
-
-  /* we have forwarding on as a default */
-  gst_harness_set_forwarding (h, TRUE);
-
+/**
+ * gst_harness_new_full: (skip)
+ * @element: a #GstElement to attach the harness to (transfer none)
+ * @hsrc: (allow-none): a #GstStaticPadTemplate describing the harness srcpad.
+ * %NULL will not create a harness srcpad.
+ * @element_sinkpad_name: (allow-none): a #gchar with the name of the element
+ * sinkpad that is then linked to the harness srcpad. Can be a static or request
+ * or a sometimes pad that has been added. %NULL will not get/request a sinkpad
+ * from the element. (Like if the element is a src.)
+ * @hsink: (allow-none): a #GstStaticPadTemplate describing the harness sinkpad.
+ * %NULL will not create a harness sinkpad.
+ * @element_srcpad_name: (allow-none): a #gchar with the name of the element
+ * srcpad that is then linked to the harness sinkpad, similar to the
+ * @element_sinkpad_name.
+ *
+ * Creates a new harness.
+ *
+ * MT safe.
+ *
+ * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
+ * not be created
+ *
+ * Since: 1.6
+ */
+GstHarness *
+gst_harness_new_full (GstElement * element,
+    GstStaticPadTemplate * hsrc, const gchar * element_sinkpad_name,
+    GstStaticPadTemplate * hsink, const gchar * element_srcpad_name)
+{
+  GstHarness *h;
+  h = gst_harness_new_empty ();
+  gst_harness_add_element_full (h, element,
+      hsrc, element_sinkpad_name, hsink, element_srcpad_name);
   return h;
 }
 
@@ -814,30 +893,27 @@ gst_harness_new (const gchar * element_name)
 }
 
 /**
- * gst_harness_new_parse: (skip)
+ * gst_harness_add_parse: (skip)
+ * @h: a #GstHarness
  * @launchline: a #gchar describing a gst-launch type line
  *
- * Creates a new harness, parsing the @launchline and putting that in a #GstBin,
- * and then attches the harness to the bin.
+ * Parses the @launchline and puts that in a #GstBin,
+ * and then attches the supplied #GstHarness to the bin.
  *
  * MT safe.
  *
- * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
- * not be created
- *
  * Since: 1.6
  */
-GstHarness *
-gst_harness_new_parse (const gchar * launchline)
+void
+gst_harness_add_parse (GstHarness * h, const gchar * launchline)
 {
-  GstHarness *h;
   GstBin *bin;
   gchar *desc;
   GstPad *pad;
   GstIterator *iter;
   gboolean done = FALSE;
 
-  g_return_val_if_fail (launchline != NULL, NULL);
+  g_return_if_fail (launchline != NULL);
 
   desc = g_strdup_printf ("bin.( %s )", launchline);
   bin =
@@ -845,7 +921,7 @@ gst_harness_new_parse (const gchar * launchline)
   g_free (desc);
 
   if (G_UNLIKELY (bin == NULL))
-    return NULL;
+    return;
 
   /* find pads and ghost them if necessary */
   if ((pad = gst_bin_find_unlinked_pad (bin, GST_PAD_SRC)) != NULL) {
@@ -875,15 +951,37 @@ gst_harness_new_parse (const gchar * launchline)
       case GST_ITERATOR_ERROR:
         gst_object_unref (bin);
         gst_iterator_free (iter);
-        g_return_val_if_reached (NULL);
+        g_return_if_reached ();
         break;
     }
   }
   gst_iterator_free (iter);
 
-  h = gst_harness_new_full (GST_ELEMENT_CAST (bin),
+  gst_harness_add_element_full (h, GST_ELEMENT_CAST (bin),
       &hsrctemplate, "sink", &hsinktemplate, "src");
   gst_object_unref (bin);
+}
+
+/**
+ * gst_harness_new_parse: (skip)
+ * @launchline: a #gchar describing a gst-launch type line
+ *
+ * Creates a new harness, parsing the @launchline and putting that in a #GstBin,
+ * and then attches the harness to the bin.
+ *
+ * MT safe.
+ *
+ * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
+ * not be created
+ *
+ * Since: 1.6
+ */
+GstHarness *
+gst_harness_new_parse (const gchar * launchline)
+{
+  GstHarness *h;
+  h = gst_harness_new_empty ();
+  gst_harness_add_parse (h, launchline);
   return h;
 }
 
@@ -964,10 +1062,14 @@ gst_harness_teardown (GstHarness * h)
 
   g_cond_clear (&priv->blocking_push_cond);
   g_mutex_clear (&priv->blocking_push_mutex);
+  g_mutex_clear (&priv->priv_mutex);
 
   g_ptr_array_unref (priv->stress);
 
   gst_object_unref (h->element);
+
+  gst_object_replace ((GstObject **) & priv->testclock, NULL);
+
   g_free (h->priv);
   g_free (h);
 }
@@ -1174,10 +1276,7 @@ gst_harness_use_systemclock (GstHarness * h)
 void
 gst_harness_use_testclock (GstHarness * h)
 {
-  GstClock *clock = gst_test_clock_new ();
-  g_assert (clock != NULL);
-  gst_element_set_clock (h->element, clock);
-  gst_object_unref (clock);
+  gst_element_set_clock (h->element, GST_CLOCK_CAST (h->priv->testclock));
 }
 
 /**
@@ -1197,17 +1296,7 @@ gst_harness_use_testclock (GstHarness * h)
 GstTestClock *
 gst_harness_get_testclock (GstHarness * h)
 {
-  GstTestClock *testclock = NULL;
-  GstClock *clock;
-
-  clock = gst_element_get_clock (h->element);
-  if (clock) {
-    if (GST_IS_TEST_CLOCK (clock))
-      testclock = GST_TEST_CLOCK (clock);
-    else
-      gst_object_unref (clock);
-  }
-  return testclock;
+  return gst_object_ref (h->priv->testclock);
 }
 
 /**
@@ -1226,13 +1315,7 @@ gst_harness_get_testclock (GstHarness * h)
 gboolean
 gst_harness_set_time (GstHarness * h, GstClockTime time)
 {
-  GstTestClock *testclock;
-  testclock = gst_harness_get_testclock (h);
-  if (testclock == NULL)
-    return FALSE;
-
-  gst_test_clock_set_time (testclock, time);
-  gst_object_unref (testclock);
+  gst_test_clock_set_time (h->priv->testclock, time);
   return TRUE;
 }
 
@@ -1258,13 +1341,10 @@ gst_harness_set_time (GstHarness * h, GstClockTime time)
 gboolean
 gst_harness_wait_for_clock_id_waits (GstHarness * h, guint waits, guint timeout)
 {
-  GstTestClock *testclock = gst_harness_get_testclock (h);
+  GstTestClock *testclock = h->priv->testclock;
   gint64 start_time;
   gboolean ret;
 
-  if (testclock == NULL)
-    return FALSE;
-
   start_time = g_get_monotonic_time ();
   while (gst_test_clock_peek_id_count (testclock) < waits) {
     gint64 time_spent;
@@ -1277,7 +1357,6 @@ gst_harness_wait_for_clock_id_waits (GstHarness * h, guint waits, guint timeout)
 
   ret = (waits == gst_test_clock_peek_id_count (testclock));
 
-  gst_object_unref (testclock);
   return ret;
 }
 
@@ -1303,32 +1382,7 @@ gst_harness_wait_for_clock_id_waits (GstHarness * h, guint waits, guint timeout)
 gboolean
 gst_harness_crank_single_clock_wait (GstHarness * h)
 {
-  GstTestClock *testclock = gst_harness_get_testclock (h);
-  GstClockID res, pending;
-  gboolean ret = FALSE;
-
-  if (G_LIKELY (testclock != NULL)) {
-    gst_test_clock_wait_for_next_pending_id (testclock, &pending);
-
-    gst_test_clock_set_time (testclock, gst_clock_id_get_time (pending));
-    res = gst_test_clock_process_next_clock_id (testclock);
-    if (res == pending) {
-      GST_DEBUG ("cranked time %" GST_TIME_FORMAT,
-          GST_TIME_ARGS (gst_clock_get_time (GST_CLOCK (testclock))));
-      ret = TRUE;
-    } else {
-      GST_WARNING ("testclock next id != pending (%p != %p)", res, pending);
-    }
-
-    gst_clock_id_unref (res);
-    gst_clock_id_unref (pending);
-
-    gst_object_unref (testclock);
-  } else {
-    GST_WARNING ("No testclock on element %s", GST_ELEMENT_NAME (h->element));
-  }
-
-  return ret;
+  return gst_test_clock_crank (h->priv->testclock);
 }
 
 /**
@@ -1352,20 +1406,15 @@ gst_harness_crank_single_clock_wait (GstHarness * h)
 gboolean
 gst_harness_crank_multiple_clock_waits (GstHarness * h, guint waits)
 {
-  GstTestClock *testclock;
+  GstTestClock *testclock = h->priv->testclock;
   GList *pending;
   guint processed;
 
-  testclock = gst_harness_get_testclock (h);
-  if (testclock == NULL)
-    return FALSE;
-
   gst_test_clock_wait_for_multiple_pending_ids (testclock, waits, &pending);
   gst_harness_set_time (h, gst_test_clock_id_list_get_latest_time (pending));
   processed = gst_test_clock_process_id_list (testclock, pending);
 
   g_list_free_full (pending, gst_clock_id_unref);
-  gst_object_unref (testclock);
   return processed == waits;
 }
 
@@ -1452,6 +1501,15 @@ gst_harness_set_forwarding (GstHarness * h, gboolean forwarding)
     gst_harness_set_forwarding (h->sink_harness, forwarding);
 }
 
+static void
+gst_harness_set_forward_pad (GstHarness * h, GstPad * fwdpad)
+{
+  HARNESS_LOCK (h);
+  gst_object_replace ((GstObject **) & h->priv->sink_forward_pad,
+      (GstObject *) fwdpad);
+  HARNESS_UNLOCK (h);
+}
+
 /**
  * gst_harness_create_buffer:
  * @h: a #GstHarness
@@ -1538,6 +1596,8 @@ GstBuffer *
 gst_harness_pull (GstHarness * h)
 {
   GstHarnessPrivate *priv = h->priv;
+  GstBuffer *buf = (GstBuffer *) g_async_queue_timeout_pop (priv->buffer_queue,
+      G_USEC_PER_SEC * 60);
 
   if (priv->blocking_push_mode) {
     g_mutex_lock (&priv->blocking_push_mutex);
@@ -1545,8 +1605,7 @@ gst_harness_pull (GstHarness * h)
     g_mutex_unlock (&priv->blocking_push_mutex);
   }
 
-  return (GstBuffer *) g_async_queue_timeout_pop (priv->buffer_queue,
-      G_USEC_PER_SEC * 60);
+  return buf;
 }
 
 /**
@@ -1567,6 +1626,7 @@ GstBuffer *
 gst_harness_try_pull (GstHarness * h)
 {
   GstHarnessPrivate *priv = h->priv;
+  GstBuffer *buf = (GstBuffer *) g_async_queue_try_pop (priv->buffer_queue);
 
   if (priv->blocking_push_mode) {
     g_mutex_lock (&priv->blocking_push_mutex);
@@ -1574,7 +1634,7 @@ gst_harness_try_pull (GstHarness * h)
     g_mutex_unlock (&priv->blocking_push_mutex);
   }
 
-  return (GstBuffer *) g_async_queue_try_pop (priv->buffer_queue);
+  return buf;
 }
 
 /**
@@ -1681,9 +1741,12 @@ gst_harness_dump_to_file (GstHarness * h, const gchar * filename)
 
   while ((buf = g_async_queue_try_pop (priv->buffer_queue))) {
     GstMapInfo info;
-    gst_buffer_map (buf, &info, GST_MAP_READ);
-    fwrite (info.data, 1, info.size, fd);
-    gst_buffer_unmap (buf, &info);
+    if (gst_buffer_map (buf, &info, GST_MAP_READ)) {
+      fwrite (info.data, 1, info.size, fd);
+      gst_buffer_unmap (buf, &info);
+    } else {
+      GST_ERROR ("failed to map buffer %p", buf);
+    }
     gst_buffer_unref (buf);
   }
 
@@ -1952,10 +2015,6 @@ gst_harness_query_latency (GstHarness * h)
  *
  * Sets the min latency reported by #GstHarness when receiving a latency-query
  *
- * MT safe.
- *
- * Returns: a #GstClockTime with min latency
- *
  * Since: 1.6
  */
 void
@@ -2043,9 +2102,7 @@ gst_harness_add_src_harness (GstHarness * h,
   if (h->src_harness)
     gst_harness_teardown (h->src_harness);
   h->src_harness = src_harness;
-
-  h->src_harness->priv->sink_forward_pad = gst_object_ref (h->srcpad);
-  gst_harness_use_testclock (h->src_harness);
+  gst_harness_set_forward_pad (h->src_harness, h->srcpad);
   h->src_harness->priv->has_clock_wait = has_clock_wait;
   gst_harness_set_forwarding (h->src_harness, h->priv->forwarding);
 }
@@ -2159,16 +2216,17 @@ gst_harness_src_crank_and_push_many (GstHarness * h, gint cranks, gint pushes)
 {
   GstFlowReturn ret = GST_FLOW_OK;
   gboolean crank;
+  int i;
 
   g_assert (h->src_harness);
   gst_harness_play (h->src_harness);
 
-  for (int i = 0; i < cranks; i++) {
+  for (i = 0; i < cranks; i++) {
     crank = gst_harness_crank_single_clock_wait (h->src_harness);
     g_assert (crank);
   }
 
-  for (int i = 0; i < pushes; i++) {
+  for (i = 0; i < pushes; i++) {
     GstBuffer *buf;
     buf = gst_harness_pull (h->src_harness);
     g_assert (buf != NULL);
@@ -2233,12 +2291,11 @@ gst_harness_add_sink_harness (GstHarness * h, GstHarness * sink_harness)
   GstHarnessPrivate *priv = h->priv;
 
   if (h->sink_harness) {
-    gst_object_replace ((GstObject **) &priv->sink_forward_pad, NULL);
+    gst_harness_set_forward_pad (h, NULL);
     gst_harness_teardown (h->sink_harness);
   }
   h->sink_harness = sink_harness;
-  priv->sink_forward_pad = gst_object_ref (h->sink_harness->srcpad);
-  gst_harness_use_testclock (h->sink_harness);
+  gst_harness_set_forward_pad (h, h->sink_harness->srcpad);
   if (priv->forwarding && h->sinkpad)
     gst_pad_sticky_events_foreach (h->sinkpad, forward_sticky_events, h);
   gst_harness_set_forwarding (h->sink_harness, priv->forwarding);
@@ -2323,8 +2380,9 @@ GstFlowReturn
 gst_harness_sink_push_many (GstHarness * h, gint pushes)
 {
   GstFlowReturn ret = GST_FLOW_OK;
+  int i;
   g_assert (h->sink_harness);
-  for (int i = 0; i < pushes; i++) {
+  for (i = 0; i < pushes; i++) {
     ret = gst_harness_push_to_sink (h);
     if (ret != GST_FLOW_OK)
       break;
@@ -2511,7 +2569,9 @@ typedef struct
 {
   GstHarnessThread t;
 
-  GstEvent *event;
+  GstHarnessPrepareEventFunc func;
+  gpointer data;
+  GDestroyNotify notify;
 } GstHarnessPushEventThread;
 
 typedef struct
@@ -2572,7 +2632,8 @@ static void
 gst_harness_push_event_thread_free (GstHarnessPushEventThread * t)
 {
   if (t != NULL) {
-    gst_event_replace (&t->event, NULL);
+    if (t->notify != NULL)
+      t->notify (t->data);
     g_slice_free (GstHarnessPushEventThread, t);
   }
 }
@@ -2743,7 +2804,7 @@ gst_harness_stress_event_func (GstHarnessThread * t)
   guint count = 0;
 
   while (t->running) {
-    gst_harness_push_event (t->h, gst_event_ref (pet->event));
+    gst_harness_push_event (t->h, pet->func (t->h, pet->data));
 
     count++;
     g_usleep (t->sleep);
@@ -2758,7 +2819,7 @@ gst_harness_stress_upstream_event_func (GstHarnessThread * t)
   guint count = 0;
 
   while (t->running) {
-    gst_harness_push_upstream_event (t->h, gst_event_ref (pet->event));
+    gst_harness_push_upstream_event (t->h, pet->func (t->h, pet->data));
 
     count++;
     g_usleep (t->sleep);
@@ -2902,6 +2963,13 @@ gst_harness_ref_buffer (GstHarness * h, gpointer data)
   return gst_buffer_ref (GST_BUFFER_CAST (data));
 }
 
+static GstEvent *
+gst_harness_ref_event (GstHarness * h, gpointer data)
+{
+  (void) h;
+  return gst_event_ref (GST_EVENT_CAST (data));
+}
+
 /**
  * gst_harness_stress_push_buffer_start_full: (skip)
  * @h: a #GstHarness
@@ -2988,11 +3056,43 @@ GstHarnessThread *
 gst_harness_stress_push_event_start_full (GstHarness * h,
     GstEvent * event, gulong sleep)
 {
+  return gst_harness_stress_push_event_with_cb_start_full (h,
+      gst_harness_ref_event, gst_event_ref (event),
+      (GDestroyNotify) gst_event_unref, sleep);
+}
+
+/**
+ * gst_harness_stress_push_event_with_cb_start_full: (skip)
+ * @h: a #GstHarness
+ * @func: a #GstHarnessPrepareEventFunc function called before every iteration
+ * to prepare / create a #GstEvent for pushing
+ * @data: a #gpointer with data to the #GstHarnessPrepareEventFunc function
+ * @notify: a #GDestroyNotify that is called when thread is stopped
+ * @sleep: a #gulong specifying how long to sleep in (microseconds) for
+ * each call to gst_pad_push
+ *
+ * Push a #GstEvent returned by @func onto the harnessed #GstElement sinkpad
+ * in intervals of @sleep microseconds.
+ *
+ * MT safe.
+ *
+ * Returns: a #GstHarnessThread
+ *
+ * Since: 1.8
+ */
+GstHarnessThread *
+gst_harness_stress_push_event_with_cb_start_full (GstHarness * h,
+    GstHarnessPrepareEventFunc func, gpointer data, GDestroyNotify notify,
+    gulong sleep)
+{
   GstHarnessPushEventThread *t = g_slice_new0 (GstHarnessPushEventThread);
   gst_harness_thread_init (&t->t,
       (GDestroyNotify) gst_harness_push_event_thread_free, h, sleep);
 
-  t->event = gst_event_ref (event);
+  t->func = func;
+  t->data = data;
+  t->notify = notify;
+
   GST_HARNESS_THREAD_START (event, t);
   return &t->t;
 }
@@ -3017,11 +3117,43 @@ GstHarnessThread *
 gst_harness_stress_push_upstream_event_start_full (GstHarness * h,
     GstEvent * event, gulong sleep)
 {
+  return gst_harness_stress_push_upstream_event_with_cb_start_full (h,
+      gst_harness_ref_event, gst_event_ref (event),
+      (GDestroyNotify) gst_event_unref, sleep);
+}
+
+/**
+ * gst_harness_stress_push_upstream_event_with_cb_start_full: (skip)
+ * @h: a #GstHarness
+ * @func: a #GstHarnessPrepareEventFunc function called before every iteration
+ * to prepare / create a #GstEvent for pushing
+ * @data: a #gpointer with data to the #GstHarnessPrepareEventFunc function
+ * @notify: a #GDestroyNotify that is called when thread is stopped
+ * @sleep: a #gulong specifying how long to sleep in (microseconds) for
+ * each call to gst_pad_push
+ *
+ * Push a #GstEvent returned by @func onto the harnessed #GstElement srcpad
+ * in intervals of @sleep microseconds.
+ *
+ * MT safe.
+ *
+ * Returns: a #GstHarnessThread
+ *
+ * Since: 1.8
+ */
+GstHarnessThread *
+gst_harness_stress_push_upstream_event_with_cb_start_full (GstHarness * h,
+    GstHarnessPrepareEventFunc func, gpointer data, GDestroyNotify notify,
+    gulong sleep)
+{
   GstHarnessPushEventThread *t = g_slice_new0 (GstHarnessPushEventThread);
   gst_harness_thread_init (&t->t,
       (GDestroyNotify) gst_harness_push_event_thread_free, h, sleep);
 
-  t->event = gst_event_ref (event);
+  t->func = func;
+  t->data = data;
+  t->notify = notify;
+
   GST_HARNESS_THREAD_START (upstream_event, t);
   return &t->t;
 }