Port gtk-doc comments to their equivalent markdown syntax
[platform/upstream/gstreamer.git] / libs / gst / check / gstharness.c
index 831ca75..f917786 100644 (file)
 
 /**
  * SECTION:gstharness
+ * @title: GstHarness
  * @short_description: A test-harness for writing GStreamer unit tests
  * @see_also: #GstTestClock,\
  *
- * #GstHarness is ment to make writing unit test for GStreamer much easier.
- * It can be though of as a way of treating a #GstElement as a black box,
- * deterministially feeding it data, and controlling what data it outputs.
+ * #GstHarness is meant to make writing unit test for GStreamer much easier.
+ * It can be thought of as a way of treating a #GstElement as a black box,
+ * deterministically feeding it data, and controlling what data it outputs.
  *
- * The basic structure of #GstHarness is two "floating" #GstPads, that connects
+ * The basic structure of #GstHarness is two "floating" #GstPads that connect
  * to the harnessed #GstElement src and sink #GstPads like so:
  *
- * <programlisting>
+ * |[
  *           __________________________
  *  _____   |  _____            _____  |   _____
  * |     |  | |     |          |     | |  |     |
  * |_____|  | |_____|          |_____| |  |_____|
  *          |__________________________|
  *
- * </programlisting>
+ * ]|
  *
  * With this, you can now simulate any environment the #GstElement might find
  * itself in. By specifying the #GstCaps of the harness #GstPads, using
- * functions like gst_harness_set_src_caps or gst_harness_set_sink_caps_str,
- * you can test how the #GstElement interacts with different capssets.
+ * functions like gst_harness_set_src_caps() or gst_harness_set_sink_caps_str(),
+ * you can test how the #GstElement interacts with different caps sets.
  *
  * Your harnessed #GstElement can of course also be a bin, and using
- * gst_harness_new_parse supporting standard gst-launch syntax, you can
+ * gst_harness_new_parse() supporting standard gst-launch syntax, you can
  * easily test a whole pipeline instead of just one element.
  *
  * You can then go on to push #GstBuffers and #GstEvents on to the srcpad,
- * using functions like gst_harness_push and gst_harness_push_event, and
- * then pull them out to examine them with gst_harness_pull and
- * gst_harness_pull_event.
- *
- * <example>
- * <title>A simple buffer-in buffer-out example</title>
- *   <programlisting language="c">
- *   #include &lt;gst/gst.h&gt;
- *   #include &lt;gst/check/gstharness.h&gt;
+ * using functions like gst_harness_push() and gst_harness_push_event(), and
+ * then pull them out to examine them with gst_harness_pull() and
+ * gst_harness_pull_event().
+ *
+ * ## A simple buffer-in buffer-out example
+ *
+ * |[<!-- language="C" -->
+ *   #include <gst/gst.h>
+ *   #include <gst/check/gstharness.h>
  *   GstHarness *h;
  *   GstBuffer *in_buf;
  *   GstBuffer *out_buf;
  *   gst_buffer_unref (out_buf);
  *   gst_harness_teardown (h);
  *
- *   </programlisting>
- * </example>
+ *   ]|
  *
  * Another main feature of the #GstHarness is its integration with the
  * #GstTestClock. Operating the #GstTestClock can be very challenging, but
  * #GstHarness simplifies some of the most desired actions a lot, like wanting
  * to manually advance the clock while at the same time releasing a #GstClockID
- * that is waiting, with functions like gst_harness_crank_single_clock_wait.
+ * that is waiting, with functions like gst_harness_crank_single_clock_wait().
  *
  * #GstHarness also supports sub-harnesses, as a way of generating and
  * validating data. A sub-harness is another #GstHarness that is managed by
  * the "parent" harness, and can either be created by using the standard
  * gst_harness_new type functions directly on the (GstHarness *)->src_harness,
- * or using the much more convenient gst_harness_add_src or
- * gst_harness_add_sink_parse. If you have a decoder-element you want to test,
+ * or using the much more convenient gst_harness_add_src() or
+ * gst_harness_add_sink_parse(). If you have a decoder-element you want to test,
  * (like vp8dec) it can be very useful to add a src-harness with both a
  * src-element (videotestsrc) and an encoder (vp8enc) to feed the decoder data
  * with different configurations, by simply doing:
  *
- * <example>
- * <programlisting language="c">
+ * |[<!-- language="C" -->
  *   GstHarness * h = gst_harness_new (h, "vp8dec");
  *   gst_harness_add_src_parse (h, "videotestsrc is-live=1 ! vp8enc", TRUE);
- * </programlisting>
- * </example>
+ * ]|
  *
  * and then feeding it data with:
  *
- * <example>
- * <programlisting language="c">
+ * |[<!-- language="C" -->
  * gst_harness_push_from_src (h);
- * </programlisting>
- * </example>
+ * ]|
  *
  */
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 
+/* we have code with side effects in asserts, so make sure they are active */
+#ifdef G_DISABLE_ASSERT
+#error "GstHarness must be compiled with G_DISABLE_ASSERT undefined"
+#endif
+
 #include "gstharness.h"
 
 #include <stdio.h>
@@ -134,6 +135,8 @@ static void gst_harness_stress_free (GstHarnessThread * t);
 
 #define HARNESS_KEY "harness"
 #define HARNESS_REF "harness-ref"
+#define HARNESS_LOCK(h) g_mutex_lock (&(h)->priv->priv_mutex)
+#define HARNESS_UNLOCK(h) g_mutex_unlock (&(h)->priv->priv_mutex)
 
 static GstStaticPadTemplate hsrctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
@@ -144,21 +147,25 @@ static GstStaticPadTemplate hsinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_ALWAYS,
     GST_STATIC_CAPS_ANY);
 
-struct _GstHarnessPrivate {
-  gchar * element_sinkpad_name;
-  gchar * element_srcpad_name;
+struct _GstHarnessPrivate
+{
+  gchar *element_sinkpad_name;
+  gchar *element_srcpad_name;
 
-  GstCaps * src_caps;
-  GstCaps * sink_caps;
-  GstPad * sink_forward_pad;
+  GstCaps *src_caps;
+  GstCaps *sink_caps;
+
+  gboolean forwarding;
+  GstPad *sink_forward_pad;
+  GstTestClock *testclock;
 
   volatile gint recv_buffers;
   volatile gint recv_events;
   volatile gint recv_upstream_events;
 
-  GAsyncQueue * buffer_queue;
-  GAsyncQueue * src_event_queue;
-  GAsyncQueue * sink_event_queue;
+  GAsyncQueue *buffer_queue;
+  GAsyncQueue *src_event_queue;
+  GAsyncQueue *sink_event_queue;
 
   GstClockTime latency_min;
   GstClockTime latency_max;
@@ -166,17 +173,18 @@ struct _GstHarnessPrivate {
   gboolean drop_buffers;
   GstClockTime last_push_ts;
 
-  GstBufferPool * pool;
-  GstAllocator * allocator;
+  GstBufferPool *pool;
+  GstAllocator *allocator;
   GstAllocationParams allocation_params;
-  GstAllocator * propose_allocator;
+  GstAllocator *propose_allocator;
   GstAllocationParams propose_allocation_params;
 
   gboolean blocking_push_mode;
   GCond blocking_push_cond;
   GMutex blocking_push_mutex;
+  GMutex priv_mutex;
 
-  GPtrArray * stress;
+  GPtrArray *stress;
 };
 
 static GstFlowReturn
@@ -219,6 +227,7 @@ gst_harness_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
   GstHarness *h = g_object_get_data (G_OBJECT (pad), HARNESS_KEY);
   GstHarnessPrivate *priv = h->priv;
+  gboolean ret = TRUE;
   gboolean forward;
 
   g_assert (h != NULL);
@@ -236,13 +245,19 @@ gst_harness_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       break;
   }
 
-  if (forward && priv->sink_forward_pad) {
-    gst_pad_push_event (priv->sink_forward_pad, event);
+  HARNESS_LOCK (h);
+  if (priv->forwarding && forward && priv->sink_forward_pad) {
+    GstPad *fwdpad = gst_object_ref (priv->sink_forward_pad);
+    HARNESS_UNLOCK (h);
+    ret = gst_pad_push_event (fwdpad, event);
+    gst_object_unref (fwdpad);
+    HARNESS_LOCK (h);
   } else {
     g_async_queue_push (priv->sink_event_queue, event);
   }
+  HARNESS_UNLOCK (h);
 
-  return TRUE;
+  return ret;
 }
 
 static void
@@ -336,7 +351,11 @@ gst_harness_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
     {
       GstCaps *caps, *filter = NULL;
 
-      caps = priv->sink_caps ? gst_caps_ref (priv->sink_caps) : gst_caps_new_any ();
+      if (priv->sink_caps) {
+        caps = gst_caps_ref (priv->sink_caps);
+      } else {
+        caps = gst_pad_get_pad_template_caps (pad);
+      }
 
       gst_query_parse_caps (query, &filter);
       if (filter != NULL) {
@@ -350,25 +369,31 @@ gst_harness_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
       break;
     case GST_QUERY_ALLOCATION:
     {
-      if (priv->sink_forward_pad != NULL) {
+      HARNESS_LOCK (h);
+      if (priv->forwarding && priv->sink_forward_pad != NULL) {
         GstPad *peer = gst_pad_get_peer (priv->sink_forward_pad);
         g_assert (peer != NULL);
+        HARNESS_UNLOCK (h);
         res = gst_pad_query (peer, query);
         gst_object_unref (peer);
+        HARNESS_LOCK (h);
       } else {
         GstCaps *caps;
         gboolean need_pool;
+        guint size;
 
         gst_query_parse_allocation (query, &caps, &need_pool);
 
         /* FIXME: Can this be removed? */
-        g_assert_cmpuint (0, ==, gst_query_get_n_allocation_params (query));
+        size = gst_query_get_n_allocation_params (query);
+        g_assert_cmpuint (0, ==, size);
         gst_query_add_allocation_param (query,
             priv->propose_allocator, &priv->propose_allocation_params);
 
         GST_DEBUG_OBJECT (pad, "proposing allocation %" GST_PTR_FORMAT,
             priv->propose_allocator);
       }
+      HARNESS_UNLOCK (h);
       break;
     }
     default:
@@ -394,7 +419,11 @@ gst_harness_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
     {
       GstCaps *caps, *filter = NULL;
 
-      caps = priv->src_caps ? gst_caps_ref (priv->src_caps) : gst_caps_new_any ();
+      if (priv->src_caps) {
+        caps = gst_caps_ref (priv->src_caps);
+      } else {
+        caps = gst_pad_get_pad_template_caps (pad);
+      }
 
       gst_query_parse_caps (query, &filter);
       if (filter != NULL) {
@@ -415,7 +444,10 @@ gst_harness_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
 static void
 gst_harness_element_ref (GstHarness * h)
 {
-  guint *data = g_object_get_data (G_OBJECT (h->element), HARNESS_REF);
+  guint *data;
+
+  GST_OBJECT_LOCK (h->element);
+  data = g_object_get_data (G_OBJECT (h->element), HARNESS_REF);
   if (data == NULL) {
     data = g_new0 (guint, 1);
     *data = 1;
@@ -423,15 +455,23 @@ gst_harness_element_ref (GstHarness * h)
   } else {
     (*data)++;
   }
+  GST_OBJECT_UNLOCK (h->element);
 }
 
 static guint
 gst_harness_element_unref (GstHarness * h)
 {
-  guint *data = g_object_get_data (G_OBJECT (h->element), HARNESS_REF);
+  guint *data;
+  guint ret;
+
+  GST_OBJECT_LOCK (h->element);
+  data = g_object_get_data (G_OBJECT (h->element), HARNESS_REF);
   g_assert (data != NULL);
   (*data)--;
-  return *data;
+  ret = *data;
+  GST_OBJECT_UNLOCK (h->element);
+
+  return ret;
 }
 
 static void
@@ -441,10 +481,12 @@ gst_harness_link_element_srcpad (GstHarness * h,
   GstHarnessPrivate *priv = h->priv;
   GstPad *srcpad = gst_element_get_static_pad (h->element,
       element_srcpad_name);
+  GstPadLinkReturn link;
   if (srcpad == NULL)
     srcpad = gst_element_get_request_pad (h->element, element_srcpad_name);
   g_assert (srcpad);
-  g_assert_cmpint (gst_pad_link (srcpad, h->sinkpad), ==, GST_PAD_LINK_OK);
+  link = gst_pad_link (srcpad, h->sinkpad);
+  g_assert_cmpint (link, ==, GST_PAD_LINK_OK);
   g_free (priv->element_srcpad_name);
   priv->element_srcpad_name = gst_pad_get_name (srcpad);
 
@@ -458,10 +500,12 @@ gst_harness_link_element_sinkpad (GstHarness * h,
   GstHarnessPrivate *priv = h->priv;
   GstPad *sinkpad = gst_element_get_static_pad (h->element,
       element_sinkpad_name);
+  GstPadLinkReturn link;
   if (sinkpad == NULL)
     sinkpad = gst_element_get_request_pad (h->element, element_sinkpad_name);
   g_assert (sinkpad);
-  g_assert_cmpint (gst_pad_link (h->srcpad, sinkpad), ==, GST_PAD_LINK_OK);
+  link = gst_pad_link (h->srcpad, sinkpad);
+  g_assert_cmpint (link, ==, GST_PAD_LINK_OK);
   g_free (priv->element_sinkpad_name);
   priv->element_sinkpad_name = gst_pad_get_name (sinkpad);
 
@@ -474,6 +518,7 @@ gst_harness_setup_src_pad (GstHarness * h,
 {
   GstHarnessPrivate *priv = h->priv;
   g_assert (src_tmpl);
+  g_assert (h->srcpad == NULL);
 
   priv->src_event_queue =
       g_async_queue_new_full ((GDestroyNotify) gst_event_unref);
@@ -498,6 +543,7 @@ gst_harness_setup_sink_pad (GstHarness * h,
 {
   GstHarnessPrivate *priv = h->priv;
   g_assert (sink_tmpl);
+  g_assert (h->sinkpad == NULL);
 
   priv->buffer_queue = g_async_queue_new_full (
       (GDestroyNotify) gst_buffer_unref);
@@ -520,6 +566,28 @@ gst_harness_setup_sink_pad (GstHarness * h,
 }
 
 static void
+check_element_type (GstElement * element, gboolean * has_sinkpad,
+    gboolean * has_srcpad)
+{
+  GstElementClass *element_class = GST_ELEMENT_GET_CLASS (element);
+  const GList *tmpl_list;
+
+  *has_srcpad = element->numsrcpads > 0;
+  *has_sinkpad = element->numsinkpads > 0;
+
+  tmpl_list = gst_element_class_get_pad_template_list (element_class);
+
+  while (tmpl_list) {
+    GstPadTemplate *pad_tmpl = (GstPadTemplate *) tmpl_list->data;
+    tmpl_list = g_list_next (tmpl_list);
+    if (GST_PAD_TEMPLATE_DIRECTION (pad_tmpl) == GST_PAD_SRC)
+      *has_srcpad |= TRUE;
+    if (GST_PAD_TEMPLATE_DIRECTION (pad_tmpl) == GST_PAD_SINK)
+      *has_sinkpad |= TRUE;
+  }
+}
+
+static void
 turn_async_and_sync_off (GstElement * element)
 {
   GObjectClass *class = G_OBJECT_GET_CLASS (element);
@@ -533,47 +601,36 @@ static gboolean
 gst_pad_is_request_pad (GstPad * pad)
 {
   GstPadTemplate *temp;
+  gboolean is_request;
+
   if (pad == NULL)
     return FALSE;
   temp = gst_pad_get_pad_template (pad);
   if (temp == NULL)
     return FALSE;
-  return GST_PAD_TEMPLATE_PRESENCE (temp) == GST_PAD_REQUEST;
+  is_request = GST_PAD_TEMPLATE_PRESENCE (temp) == GST_PAD_REQUEST;
+  gst_object_unref (temp);
+  return is_request;
 }
 
 /**
- * gst_harness_new_full: (skip)
- * @element: a #GstElement to attach the harness to (transfer none)
- * @hsrc: (allow-none): a #GstStaticPadTemplate describing the harness srcpad.
- * %NULL will not create a harness srcpad.
- * @sinkpad: (allow-none): a #gchar with the name of the element sinkpad that is
- * then linked to the harness srcpad. Can be a static or request or a sometimes
- * pad that has been added. %NULL will not get/request a sinkpad from the
- * element. (Like if the element is a src)
- * @hsink: (allow-none): a #GstStaticPadTemplate describing the harness sinkpad.
- * %NULL will not create a harness sinkpad.
- * @srcpad: (allow-none): a #gchar with the name of the element srcpad that is
- * then linked to the harness sinkpad, similar to the @sinkpad.
+ * gst_harness_new_empty: (skip)
  *
- * Creates a new harness.
+ * Creates a new empty harness. Use gst_harness_add_element_full() to add
+ * an #GstElement to it.
  *
  * MT safe.
  *
  * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
  * not be created
  *
- * Since: 1.6
+ * Since: 1.8
  */
 GstHarness *
-gst_harness_new_full (GstElement * element,
-    GstStaticPadTemplate * hsrc, const gchar * sinkpad,
-    GstStaticPadTemplate * hsink, const gchar * srcpad)
+gst_harness_new_empty (void)
 {
   GstHarness *h;
   GstHarnessPrivate *priv;
-  gboolean is_sink, is_src;
-
-  g_return_val_if_fail (element != NULL, NULL);
 
   h = g_new0 (GstHarness, 1);
   g_assert (h != NULL);
@@ -581,67 +638,157 @@ gst_harness_new_full (GstElement * element,
   priv = h->priv;
 
   GST_DEBUG_OBJECT (h, "about to create new harness %p", h);
-  h->element = gst_object_ref (element);
   priv->last_push_ts = GST_CLOCK_TIME_NONE;
   priv->latency_min = 0;
   priv->latency_max = GST_CLOCK_TIME_NONE;
   priv->drop_buffers = FALSE;
+  priv->testclock = GST_TEST_CLOCK_CAST (gst_test_clock_new ());
 
   priv->propose_allocator = NULL;
   gst_allocation_params_init (&priv->propose_allocation_params);
 
   g_mutex_init (&priv->blocking_push_mutex);
   g_cond_init (&priv->blocking_push_cond);
+  g_mutex_init (&priv->priv_mutex);
+
+  priv->stress = g_ptr_array_new_with_free_func (
+      (GDestroyNotify) gst_harness_stress_free);
+
+  /* we have forwarding on as a default */
+  gst_harness_set_forwarding (h, TRUE);
 
-  is_src = GST_OBJECT_FLAG_IS_SET (element, GST_ELEMENT_FLAG_SOURCE);
-  is_sink = GST_OBJECT_FLAG_IS_SET (element, GST_ELEMENT_FLAG_SINK);
+  return h;
+}
+
+/**
+ * gst_harness_add_element_full: (skip)
+ * @h: a #GstHarness
+ * @element: a #GstElement to add to the harness (transfer none)
+ * @hsrc: (allow-none): a #GstStaticPadTemplate describing the harness srcpad.
+ * %NULL will not create a harness srcpad.
+ * @element_sinkpad_name: (allow-none): a #gchar with the name of the element
+ * sinkpad that is then linked to the harness srcpad. Can be a static or request
+ * or a sometimes pad that has been added. %NULL will not get/request a sinkpad
+ * from the element. (Like if the element is a src.)
+ * @hsink: (allow-none): a #GstStaticPadTemplate describing the harness sinkpad.
+ * %NULL will not create a harness sinkpad.
+ * @element_srcpad_name: (allow-none): a #gchar with the name of the element
+ * srcpad that is then linked to the harness sinkpad, similar to the
+ * @element_sinkpad_name.
+ *
+ * Adds a #GstElement to an empty #GstHarness
+ *
+ * MT safe.
+ *
+ * Since: 1.6
+ */
+void
+gst_harness_add_element_full (GstHarness * h, GstElement * element,
+    GstStaticPadTemplate * hsrc, const gchar * element_sinkpad_name,
+    GstStaticPadTemplate * hsink, const gchar * element_srcpad_name)
+{
+  GstClock *element_clock;
+  gboolean has_sinkpad, has_srcpad;
+
+  g_return_if_fail (element != NULL);
+  g_return_if_fail (h->element == NULL);
+
+  element_clock = GST_ELEMENT_CLOCK (element);
+  h->element = gst_object_ref (element);
+  check_element_type (element, &has_sinkpad, &has_srcpad);
 
   /* setup the loose srcpad linked to the element sinkpad */
-  if (!is_src && hsrc)
-    gst_harness_setup_src_pad (h, hsrc, sinkpad);
+  if (has_sinkpad)
+    gst_harness_setup_src_pad (h, hsrc, element_sinkpad_name);
 
   /* setup the loose sinkpad linked to the element srcpad */
-  if (!is_sink && hsink)
-    gst_harness_setup_sink_pad (h, hsink, srcpad);
+  if (has_srcpad)
+    gst_harness_setup_sink_pad (h, hsink, element_srcpad_name);
 
   /* as a harness sink, we should not need sync and async */
-  if (is_sink)
+  if (has_sinkpad && !has_srcpad)
     turn_async_and_sync_off (h->element);
 
   if (h->srcpad != NULL) {
+    gboolean handled;
     gchar *stream_id = g_strdup_printf ("%s-%p",
         GST_OBJECT_NAME (h->element), h);
-    g_assert (gst_pad_push_event (h->srcpad,
-            gst_event_new_stream_start (stream_id)));
+    handled = gst_pad_push_event (h->srcpad,
+        gst_event_new_stream_start (stream_id));
+    g_assert (handled);
     g_free (stream_id);
   }
 
+  /* if the element already has a testclock attached,
+     we replace our own with it, if no clock we attach the testclock */
+  if (element_clock) {
+    if (GST_IS_TEST_CLOCK (element_clock)) {
+      gst_object_replace ((GstObject **) & h->priv->testclock,
+          (GstObject *) GST_ELEMENT_CLOCK (element));
+    }
+  } else {
+    gst_harness_use_testclock (h);
+  }
+
   /* don't start sources, they start producing data! */
-  if (!is_src)
+  if (has_sinkpad)
     gst_harness_play (h);
 
   gst_harness_element_ref (h);
 
-  GST_DEBUG_OBJECT (h, "created new harness %p "
-      "with srcpad (%p, %s, %s) and sinkpad (%p, %s, %s)",
+  GST_DEBUG_OBJECT (h, "added element to harness %p "
+      "with element_srcpad_name (%p, %s, %s) and element_sinkpad_name (%p, %s, %s)",
       h, h->srcpad, GST_DEBUG_PAD_NAME (h->srcpad),
       h->sinkpad, GST_DEBUG_PAD_NAME (h->sinkpad));
+}
 
-  priv->stress = g_ptr_array_new_with_free_func (
-      (GDestroyNotify) gst_harness_stress_free);
-
+/**
+ * gst_harness_new_full: (skip)
+ * @element: a #GstElement to attach the harness to (transfer none)
+ * @hsrc: (allow-none): a #GstStaticPadTemplate describing the harness srcpad.
+ * %NULL will not create a harness srcpad.
+ * @element_sinkpad_name: (allow-none): a #gchar with the name of the element
+ * sinkpad that is then linked to the harness srcpad. Can be a static or request
+ * or a sometimes pad that has been added. %NULL will not get/request a sinkpad
+ * from the element. (Like if the element is a src.)
+ * @hsink: (allow-none): a #GstStaticPadTemplate describing the harness sinkpad.
+ * %NULL will not create a harness sinkpad.
+ * @element_srcpad_name: (allow-none): a #gchar with the name of the element
+ * srcpad that is then linked to the harness sinkpad, similar to the
+ * @element_sinkpad_name.
+ *
+ * Creates a new harness.
+ *
+ * MT safe.
+ *
+ * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
+ * not be created
+ *
+ * Since: 1.6
+ */
+GstHarness *
+gst_harness_new_full (GstElement * element,
+    GstStaticPadTemplate * hsrc, const gchar * element_sinkpad_name,
+    GstStaticPadTemplate * hsink, const gchar * element_srcpad_name)
+{
+  GstHarness *h;
+  h = gst_harness_new_empty ();
+  gst_harness_add_element_full (h, element,
+      hsrc, element_sinkpad_name, hsink, element_srcpad_name);
   return h;
 }
 
 /**
  * gst_harness_new_with_element: (skip)
  * @element: a #GstElement to attach the harness to (transfer none)
- * @sinkpad: (allow-none): a #gchar with the name of the element sinkpad that
- * is then linked to the harness srcpad. %NULL does not attach a sinkpad
- * @srcpad: (allow-none): a #gchar with the name of the element srcpad that is
- * then linked to the harness sinkpad. %NULL does not attach a srcpad
- *
- * Creates a new harness. Works in the same way as gst_harness_new_full, only
+ * @element_sinkpad_name: (allow-none): a #gchar with the name of the element
+ * sinkpad that is then linked to the harness srcpad. %NULL does not attach a
+ * sinkpad
+ * @element_srcpad_name: (allow-none): a #gchar with the name of the element
+ * srcpad that is then linked to the harness sinkpad. %NULL does not attach a
+ * srcpad
+ *
+ * Creates a new harness. Works in the same way as gst_harness_new_full(), only
  * that generic padtemplates are used for the harness src and sinkpads, which
  * will be sufficient in most usecases.
  *
@@ -654,21 +801,23 @@ gst_harness_new_full (GstElement * element,
  */
 GstHarness *
 gst_harness_new_with_element (GstElement * element,
-    const gchar * sinkpad, const gchar * srcpad)
+    const gchar * element_sinkpad_name, const gchar * element_srcpad_name)
 {
   return gst_harness_new_full (element,
-      &hsrctemplate, sinkpad, &hsinktemplate, srcpad);
+      &hsrctemplate, element_sinkpad_name, &hsinktemplate, element_srcpad_name);
 }
 
 /**
  * gst_harness_new_with_padnames: (skip)
  * @element_name: a #gchar describing the #GstElement name
- * @sinkpad: (allow-none): a #gchar with the name of the element sinkpad that
- * is then linked to the harness srcpad. %NULL does not attach a sinkpad
- * @srcpad: (allow-none): a #gchar with the name of the element srcpad that is
- * then linked to the harness sinkpad. %NULL does not attach a srcpad
- *
- * Creates a new harness. Works in the same way as gst_harness_new_with_element,
+ * @element_sinkpad_name: (allow-none): a #gchar with the name of the element
+ * sinkpad that is then linked to the harness srcpad. %NULL does not attach a
+ * sinkpad
+ * @element_srcpad_name: (allow-none): a #gchar with the name of the element
+ * srcpad that is then linked to the harness sinkpad. %NULL does not attach a
+ * srcpad
+ *
+ * Creates a new harness. Works like gst_harness_new_with_element(),
  * except you specify the factoryname of the #GstElement
  *
  * MT safe.
@@ -680,13 +829,14 @@ gst_harness_new_with_element (GstElement * element,
  */
 GstHarness *
 gst_harness_new_with_padnames (const gchar * element_name,
-    const gchar * sinkpad, const gchar * srcpad)
+    const gchar * element_sinkpad_name, const gchar * element_srcpad_name)
 {
   GstHarness *h;
   GstElement *element = gst_element_factory_make (element_name, NULL);
   g_assert (element != NULL);
 
-  h = gst_harness_new_with_element (element, sinkpad, srcpad);
+  h = gst_harness_new_with_element (element, element_sinkpad_name,
+      element_srcpad_name);
   gst_object_unref (element);
   return h;
 }
@@ -699,7 +849,7 @@ gst_harness_new_with_padnames (const gchar * element_name,
  * @hsink: (allow-none): a #GstStaticPadTemplate describing the harness sinkpad.
  * %NULL will not create a harness sinkpad.
  *
- * Creates a new harness, like gst_harness_new_full, except it
+ * Creates a new harness, like gst_harness_new_full(), except it
  * assumes the #GstElement sinkpad is named "sink" and srcpad is named "src"
  *
  * MT safe.
@@ -726,7 +876,7 @@ gst_harness_new_with_templates (const gchar * element_name,
  * gst_harness_new: (skip)
  * @element_name: a #gchar describing the #GstElement name
  *
- * Creates a new harness. Works like gst_harness_new_with_padnames, except it
+ * Creates a new harness. Works like gst_harness_new_with_padnames(), except it
  * assumes the #GstElement sinkpad is named "sink" and srcpad is named "src"
  *
  * MT safe.
@@ -743,30 +893,27 @@ gst_harness_new (const gchar * element_name)
 }
 
 /**
- * gst_harness_new_parse: (skip)
+ * gst_harness_add_parse: (skip)
+ * @h: a #GstHarness
  * @launchline: a #gchar describing a gst-launch type line
  *
- * Creates a new harness, parsing the @launchline and putting that in a #GstBin,
- * and then attches the harness to the bin.
+ * Parses the @launchline and puts that in a #GstBin,
+ * and then attches the supplied #GstHarness to the bin.
  *
  * MT safe.
  *
- * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
- * not be created
- *
  * Since: 1.6
  */
-GstHarness *
-gst_harness_new_parse (const gchar * launchline)
+void
+gst_harness_add_parse (GstHarness * h, const gchar * launchline)
 {
-  GstHarness *h;
   GstBin *bin;
   gchar *desc;
   GstPad *pad;
   GstIterator *iter;
   gboolean done = FALSE;
 
-  g_return_val_if_fail (launchline != NULL, NULL);
+  g_return_if_fail (launchline != NULL);
 
   desc = g_strdup_printf ("bin.( %s )", launchline);
   bin =
@@ -774,7 +921,7 @@ gst_harness_new_parse (const gchar * launchline)
   g_free (desc);
 
   if (G_UNLIKELY (bin == NULL))
-    return NULL;
+    return;
 
   /* find pads and ghost them if necessary */
   if ((pad = gst_bin_find_unlinked_pad (bin, GST_PAD_SRC)) != NULL) {
@@ -804,15 +951,37 @@ gst_harness_new_parse (const gchar * launchline)
       case GST_ITERATOR_ERROR:
         gst_object_unref (bin);
         gst_iterator_free (iter);
-        g_return_val_if_reached (NULL);
+        g_return_if_reached ();
         break;
     }
   }
   gst_iterator_free (iter);
 
-  h = gst_harness_new_full (GST_ELEMENT_CAST (bin),
+  gst_harness_add_element_full (h, GST_ELEMENT_CAST (bin),
       &hsrctemplate, "sink", &hsinktemplate, "src");
   gst_object_unref (bin);
+}
+
+/**
+ * gst_harness_new_parse: (skip)
+ * @launchline: a #gchar describing a gst-launch type line
+ *
+ * Creates a new harness, parsing the @launchline and putting that in a #GstBin,
+ * and then attches the harness to the bin.
+ *
+ * MT safe.
+ *
+ * Returns: (transfer full): a #GstHarness, or %NULL if the harness could
+ * not be created
+ *
+ * Since: 1.6
+ */
+GstHarness *
+gst_harness_new_parse (const gchar * launchline)
+{
+  GstHarness *h;
+  h = gst_harness_new_empty ();
+  gst_harness_add_parse (h, launchline);
   return h;
 }
 
@@ -842,6 +1011,7 @@ gst_harness_teardown (GstHarness * h)
     gst_harness_teardown (h->src_harness);
   }
 
+  gst_object_replace ((GstObject **) & priv->sink_forward_pad, NULL);
   if (h->sink_harness) {
     gst_harness_teardown (h->sink_harness);
   }
@@ -875,29 +1045,31 @@ gst_harness_teardown (GstHarness * h)
     g_async_queue_unref (priv->sink_event_queue);
   }
 
-  if (priv->sink_forward_pad)
-    gst_object_unref (priv->sink_forward_pad);
-
   gst_object_replace ((GstObject **) & priv->propose_allocator, NULL);
   gst_object_replace ((GstObject **) & priv->allocator, NULL);
   gst_object_replace ((GstObject **) & priv->pool, NULL);
 
   /* if we hold the last ref, set to NULL */
   if (gst_harness_element_unref (h) == 0) {
+    gboolean state_change;
     GstState state, pending;
-    g_assert (gst_element_set_state (h->element, GST_STATE_NULL) ==
-        GST_STATE_CHANGE_SUCCESS);
-    g_assert (gst_element_get_state (h->element, &state, &pending, 0) ==
-        GST_STATE_CHANGE_SUCCESS);
+    state_change = gst_element_set_state (h->element, GST_STATE_NULL);
+    g_assert (state_change == GST_STATE_CHANGE_SUCCESS);
+    state_change = gst_element_get_state (h->element, &state, &pending, 0);
+    g_assert (state_change == GST_STATE_CHANGE_SUCCESS);
     g_assert (state == GST_STATE_NULL);
   }
 
   g_cond_clear (&priv->blocking_push_cond);
   g_mutex_clear (&priv->blocking_push_mutex);
+  g_mutex_clear (&priv->priv_mutex);
 
   g_ptr_array_unref (priv->stress);
 
   gst_object_unref (h->element);
+
+  gst_object_replace ((GstObject **) & priv->testclock, NULL);
+
   g_free (h->priv);
   g_free (h);
 }
@@ -919,8 +1091,11 @@ void
 gst_harness_add_element_src_pad (GstHarness * h, GstPad * srcpad)
 {
   GstHarnessPrivate *priv = h->priv;
-  gst_harness_setup_sink_pad (h, &hsinktemplate, NULL);
-  g_assert_cmpint (gst_pad_link (srcpad, h->sinkpad), ==, GST_PAD_LINK_OK);
+  GstPadLinkReturn link;
+  if (h->sinkpad == NULL)
+    gst_harness_setup_sink_pad (h, &hsinktemplate, NULL);
+  link = gst_pad_link (srcpad, h->sinkpad);
+  g_assert_cmpint (link, ==, GST_PAD_LINK_OK);
   g_free (priv->element_srcpad_name);
   priv->element_srcpad_name = gst_pad_get_name (srcpad);
 }
@@ -940,8 +1115,11 @@ void
 gst_harness_add_element_sink_pad (GstHarness * h, GstPad * sinkpad)
 {
   GstHarnessPrivate *priv = h->priv;
-  gst_harness_setup_src_pad (h, &hsrctemplate, NULL);
-  g_assert_cmpint (gst_pad_link (h->srcpad, sinkpad), ==, GST_PAD_LINK_OK);
+  GstPadLinkReturn link;
+  if (h->srcpad == NULL)
+    gst_harness_setup_src_pad (h, &hsrctemplate, NULL);
+  link = gst_pad_link (h->srcpad, sinkpad);
+  g_assert_cmpint (link, ==, GST_PAD_LINK_OK);
   g_free (priv->element_sinkpad_name);
   priv->element_sinkpad_name = gst_pad_get_name (sinkpad);
 }
@@ -963,12 +1141,14 @@ gst_harness_set_src_caps (GstHarness * h, GstCaps * caps)
 {
   GstHarnessPrivate *priv = h->priv;
   GstSegment segment;
+  gboolean handled;
 
-  g_assert (gst_pad_push_event (h->srcpad, gst_event_new_caps (caps)));
+  handled = gst_pad_push_event (h->srcpad, gst_event_new_caps (caps));
+  g_assert (handled);
   gst_caps_take (&priv->src_caps, caps);
 
   gst_segment_init (&segment, GST_FORMAT_TIME);
-  g_assert (gst_pad_push_event (h->srcpad, gst_event_new_segment (&segment)));
+  handled = gst_pad_push_event (h->srcpad, gst_event_new_segment (&segment));
 }
 
 /**
@@ -1096,10 +1276,7 @@ gst_harness_use_systemclock (GstHarness * h)
 void
 gst_harness_use_testclock (GstHarness * h)
 {
-  GstClock *clock = gst_test_clock_new ();
-  g_assert (clock != NULL);
-  gst_element_set_clock (h->element, clock);
-  gst_object_unref (clock);
+  gst_element_set_clock (h->element, GST_CLOCK_CAST (h->priv->testclock));
 }
 
 /**
@@ -1119,17 +1296,7 @@ gst_harness_use_testclock (GstHarness * h)
 GstTestClock *
 gst_harness_get_testclock (GstHarness * h)
 {
-  GstTestClock *testclock = NULL;
-  GstClock *clock;
-
-  clock = gst_element_get_clock (h->element);
-  if (clock) {
-    if (GST_IS_TEST_CLOCK (clock))
-      testclock = GST_TEST_CLOCK (clock);
-    else
-      gst_object_unref (clock);
-  }
-  return testclock;
+  return gst_object_ref (h->priv->testclock);
 }
 
 /**
@@ -1148,13 +1315,7 @@ gst_harness_get_testclock (GstHarness * h)
 gboolean
 gst_harness_set_time (GstHarness * h, GstClockTime time)
 {
-  GstTestClock *testclock;
-  testclock = gst_harness_get_testclock (h);
-  if (testclock == NULL)
-    return FALSE;
-
-  gst_test_clock_set_time (testclock, time);
-  gst_object_unref (testclock);
+  gst_test_clock_set_time (h->priv->testclock, time);
   return TRUE;
 }
 
@@ -1180,13 +1341,10 @@ gst_harness_set_time (GstHarness * h, GstClockTime time)
 gboolean
 gst_harness_wait_for_clock_id_waits (GstHarness * h, guint waits, guint timeout)
 {
-  GstTestClock *testclock = gst_harness_get_testclock (h);
+  GstTestClock *testclock = h->priv->testclock;
   gint64 start_time;
   gboolean ret;
 
-  if (testclock == NULL)
-    return FALSE;
-
   start_time = g_get_monotonic_time ();
   while (gst_test_clock_peek_id_count (testclock) < waits) {
     gint64 time_spent;
@@ -1199,7 +1357,6 @@ gst_harness_wait_for_clock_id_waits (GstHarness * h, guint waits, guint timeout)
 
   ret = (waits == gst_test_clock_peek_id_count (testclock));
 
-  gst_object_unref (testclock);
   return ret;
 }
 
@@ -1225,32 +1382,7 @@ gst_harness_wait_for_clock_id_waits (GstHarness * h, guint waits, guint timeout)
 gboolean
 gst_harness_crank_single_clock_wait (GstHarness * h)
 {
-  GstTestClock *testclock = gst_harness_get_testclock (h);
-  GstClockID res, pending;
-  gboolean ret = FALSE;
-
-  if (G_LIKELY (testclock != NULL)) {
-    gst_test_clock_wait_for_next_pending_id (testclock, &pending);
-
-    gst_test_clock_set_time (testclock, gst_clock_id_get_time (pending));
-    res = gst_test_clock_process_next_clock_id (testclock);
-    if (res == pending) {
-      GST_DEBUG ("cranked time %" GST_TIME_FORMAT,
-          GST_TIME_ARGS (gst_clock_get_time (GST_CLOCK (testclock))));
-      ret = TRUE;
-    } else {
-      GST_WARNING ("testclock next id != pending (%p != %p)", res, pending);
-    }
-
-    gst_clock_id_unref (res);
-    gst_clock_id_unref (pending);
-
-    gst_object_unref (testclock);
-  } else {
-    GST_WARNING ("No testclock on element %s", GST_ELEMENT_NAME (h->element));
-  }
-
-  return ret;
+  return gst_test_clock_crank (h->priv->testclock);
 }
 
 /**
@@ -1258,7 +1390,7 @@ gst_harness_crank_single_clock_wait (GstHarness * h)
  * @h: a #GstHarness
  * @waits: a #guint describing the number of #GstClockIDs to crank
  *
- * Similar to gst_harness_crank_single_clock_wait, this is the function to use
+ * Similar to gst_harness_crank_single_clock_wait(), this is the function to use
  * if your harnessed element(s) are using more then one gst_clock_id_wait.
  * Failing to do so can (and will) make it racy which #GstClockID you actually
  * are releasing, where as this function will process all the waits at the
@@ -1274,20 +1406,15 @@ gst_harness_crank_single_clock_wait (GstHarness * h)
 gboolean
 gst_harness_crank_multiple_clock_waits (GstHarness * h, guint waits)
 {
-  GstTestClock *testclock;
+  GstTestClock *testclock = h->priv->testclock;
   GList *pending;
   guint processed;
 
-  testclock = gst_harness_get_testclock (h);
-  if (testclock == NULL)
-    return FALSE;
-
   gst_test_clock_wait_for_multiple_pending_ids (testclock, waits, &pending);
   gst_harness_set_time (h, gst_test_clock_id_list_get_latest_time (pending));
   processed = gst_test_clock_process_id_list (testclock, pending);
 
   g_list_free_full (pending, gst_clock_id_unref);
-  gst_object_unref (testclock);
   return processed == waits;
 }
 
@@ -1301,7 +1428,7 @@ gst_harness_crank_multiple_clock_waits (GstHarness * h, guint waits)
  * Non-src #GstElements (like sinks and filters) are automatically set to
  * playing by the #GstHarness, but src #GstElements are not to avoid them
  * starting to produce buffers.
- * Hence, for src #GstElement you will need to call gst_harness_play explicitly.
+ * Hence, for src #GstElement you must call gst_harness_play() explicitly.
  *
  * MT safe.
  *
@@ -1311,10 +1438,11 @@ void
 gst_harness_play (GstHarness * h)
 {
   GstState state, pending;
-  g_assert_cmpint (GST_STATE_CHANGE_SUCCESS, ==,
-      gst_element_set_state (h->element, GST_STATE_PLAYING));
-  g_assert_cmpint (GST_STATE_CHANGE_SUCCESS, ==,
-      gst_element_get_state (h->element, &state, &pending, 0));
+  gboolean state_change;
+  state_change = gst_element_set_state (h->element, GST_STATE_PLAYING);
+  g_assert_cmpint (GST_STATE_CHANGE_SUCCESS, ==, state_change);
+  state_change = gst_element_get_state (h->element, &state, &pending, 0);
+  g_assert_cmpint (GST_STATE_CHANGE_SUCCESS, ==, state_change);
   g_assert_cmpint (GST_STATE_PLAYING, ==, state);
 }
 
@@ -1323,9 +1451,9 @@ gst_harness_play (GstHarness * h)
  * @h: a #GstHarness
  *
  * Setting this will make the harness block in the chain-function, and
- * then release when gst_harness_pull or gst_harness_try_pull is called.
+ * then release when gst_harness_pull() or gst_harness_try_pull() is called.
  * Can be useful when wanting to control a src-element that is not implementing
- * gst_clock_id_wait so it can't be controlled by the #GstTestClock, since
+ * gst_clock_id_wait() so it can't be controlled by the #GstTestClock, since
  * it otherwise would produce buffers as fast as possible.
  *
  * MT safe.
@@ -1340,12 +1468,55 @@ gst_harness_set_blocking_push_mode (GstHarness * h)
 }
 
 /**
+ * gst_harness_set_forwarding:
+ * @h: a #GstHarness
+ * @forwarding: a #gboolean to enable/disable forwarding
+ *
+ * As a convenience, a src-harness will forward %GST_EVENT_STREAM_START,
+ * %GST_EVENT_CAPS and %GST_EVENT_SEGMENT to the main-harness if forwarding
+ * is enabled, and forward any sticky-events from the main-harness to
+ * the sink-harness. It will also forward the %GST_QUERY_ALLOCATION.
+ *
+ * If forwarding is disabled, the user will have to either manually push
+ * these events from the src-harness using gst_harness_src_push_event(), or
+ * create and push them manually. While this will allow full control and
+ * inspection of these events, for the most cases having forwarding enabled
+ * will be sufficient when writing a test where the src-harness' main function
+ * is providing data for the main-harness.
+ *
+ * Forwarding is enabled by default.
+ *
+ * MT safe.
+ *
+ * Since: 1.6
+ */
+void
+gst_harness_set_forwarding (GstHarness * h, gboolean forwarding)
+{
+  GstHarnessPrivate *priv = h->priv;
+  priv->forwarding = forwarding;
+  if (h->src_harness)
+    gst_harness_set_forwarding (h->src_harness, forwarding);
+  if (h->sink_harness)
+    gst_harness_set_forwarding (h->sink_harness, forwarding);
+}
+
+static void
+gst_harness_set_forward_pad (GstHarness * h, GstPad * fwdpad)
+{
+  HARNESS_LOCK (h);
+  gst_object_replace ((GstObject **) & h->priv->sink_forward_pad,
+      (GstObject *) fwdpad);
+  HARNESS_UNLOCK (h);
+}
+
+/**
  * gst_harness_create_buffer:
  * @h: a #GstHarness
  * @size: a #gsize specifying the size of the buffer
  *
  * Allocates a buffer using a #GstBufferPool if present, or else using the
- * configured #GstAllocator and #GstAllocatorParams
+ * configured #GstAllocator and #GstAllocationParams
  *
  * MT safe.
  *
@@ -1358,13 +1529,14 @@ gst_harness_create_buffer (GstHarness * h, gsize size)
 {
   GstHarnessPrivate *priv = h->priv;
   GstBuffer *ret = NULL;
+  GstFlowReturn flow;
 
   if (gst_pad_check_reconfigure (h->srcpad))
     gst_harness_negotiate (h);
 
   if (priv->pool) {
-    g_assert_cmpint (gst_buffer_pool_acquire_buffer (priv->pool, &ret, NULL), ==,
-        GST_FLOW_OK);
+    flow = gst_buffer_pool_acquire_buffer (priv->pool, &ret, NULL);
+    g_assert_cmpint (flow, ==, GST_FLOW_OK);
     if (gst_buffer_get_size (ret) != size) {
       GST_DEBUG_OBJECT (h,
           "use fallback, pool is configured with a different size (%zu != %zu)",
@@ -1375,7 +1547,9 @@ gst_harness_create_buffer (GstHarness * h, gsize size)
   }
 
   if (!ret)
-    ret = gst_buffer_new_allocate (priv->allocator, size, &priv->allocation_params);
+    ret =
+        gst_buffer_new_allocate (priv->allocator, size,
+        &priv->allocation_params);
 
   g_assert (ret != NULL);
   return ret;
@@ -1422,6 +1596,8 @@ GstBuffer *
 gst_harness_pull (GstHarness * h)
 {
   GstHarnessPrivate *priv = h->priv;
+  GstBuffer *buf = (GstBuffer *) g_async_queue_timeout_pop (priv->buffer_queue,
+      G_USEC_PER_SEC * 60);
 
   if (priv->blocking_push_mode) {
     g_mutex_lock (&priv->blocking_push_mutex);
@@ -1429,8 +1605,7 @@ gst_harness_pull (GstHarness * h)
     g_mutex_unlock (&priv->blocking_push_mutex);
   }
 
-  return (GstBuffer *) g_async_queue_timeout_pop (priv->buffer_queue,
-      G_USEC_PER_SEC * 60);
+  return buf;
 }
 
 /**
@@ -1451,6 +1626,7 @@ GstBuffer *
 gst_harness_try_pull (GstHarness * h)
 {
   GstHarnessPrivate *priv = h->priv;
+  GstBuffer *buf = (GstBuffer *) g_async_queue_try_pop (priv->buffer_queue);
 
   if (priv->blocking_push_mode) {
     g_mutex_lock (&priv->blocking_push_mutex);
@@ -1458,7 +1634,7 @@ gst_harness_try_pull (GstHarness * h)
     g_mutex_unlock (&priv->blocking_push_mutex);
   }
 
-  return (GstBuffer *) g_async_queue_try_pop (priv->buffer_queue);
+  return buf;
 }
 
 /**
@@ -1836,10 +2012,6 @@ gst_harness_query_latency (GstHarness * h)
  *
  * Sets the min latency reported by #GstHarness when receiving a latency-query
  *
- * MT safe.
- *
- * Returns: a #GstClockTime with min latency
- *
  * Since: 1.6
  */
 void
@@ -1852,10 +2024,9 @@ gst_harness_set_upstream_latency (GstHarness * h, GstClockTime latency)
 /**
  * gst_harness_get_allocator:
  * @h: a #GstHarness
- * @allocator: (out) (allow-none) (transfer none): the #GstAllocator
- * used
- * @params: (out) (allow-none) (transfer full): the
- * #GstAllocatorParams of @allocator
+ * @allocator: (out) (allow-none) (transfer none): the #GstAllocator used
+ * @params: (out) (allow-none) (transfer full): the #GstAllocationParams of
+ *   @allocator
  *
  * Gets the @allocator and its @params that has been decided to use after an
  * allocation query.
@@ -1900,18 +2071,10 @@ gst_harness_set_propose_allocator (GstHarness * h, GstAllocator * allocator,
     priv->propose_allocation_params = *params;
 }
 
-static void
-gst_harness_setup_src_harness (GstHarness * h, gboolean has_clock_wait)
-{
-  h->src_harness->priv->sink_forward_pad = gst_object_ref (h->srcpad);
-  gst_harness_use_testclock (h->src_harness);
-  h->src_harness->priv->has_clock_wait = has_clock_wait;
-}
-
 /**
- * gst_harness_add_src:
+ * gst_harness_add_src_harness:
  * @h: a #GstHarness
- * @src_element_name: a #gchar with the name of a #GstElement
+ * @src_harness: (transfer full): a #GstHarness to be added as a src-harness.
  * @has_clock_wait: a #gboolean specifying if the #GstElement uses
  * gst_clock_wait_id internally.
  *
@@ -1930,13 +2093,37 @@ gst_harness_setup_src_harness (GstHarness * h, gboolean has_clock_wait)
  * Since: 1.6
  */
 void
-gst_harness_add_src (GstHarness * h,
-    const gchar * src_element_name, gboolean has_clock_wait)
+gst_harness_add_src_harness (GstHarness * h,
+    GstHarness * src_harness, gboolean has_clock_wait)
 {
   if (h->src_harness)
     gst_harness_teardown (h->src_harness);
-  h->src_harness = gst_harness_new (src_element_name);
-  gst_harness_setup_src_harness (h, has_clock_wait);
+  h->src_harness = src_harness;
+  gst_harness_set_forward_pad (h->src_harness, h->srcpad);
+  h->src_harness->priv->has_clock_wait = has_clock_wait;
+  gst_harness_set_forwarding (h->src_harness, h->priv->forwarding);
+}
+
+/**
+ * gst_harness_add_src:
+ * @h: a #GstHarness
+ * @src_element_name: a #gchar with the name of a #GstElement
+ * @has_clock_wait: a #gboolean specifying if the #GstElement uses
+ * gst_clock_wait_id internally.
+ *
+ * Similar to gst_harness_add_src_harness, this is a convenience to
+ * directly create a src-harness using the @src_element_name name specified.
+ *
+ * MT safe.
+ *
+ * Since: 1.6
+ */
+void
+gst_harness_add_src (GstHarness * h,
+    const gchar * src_element_name, gboolean has_clock_wait)
+{
+  GstHarness *src_harness = gst_harness_new (src_element_name);
+  gst_harness_add_src_harness (h, src_harness, has_clock_wait);
 }
 
 /**
@@ -1960,10 +2147,8 @@ void
 gst_harness_add_src_parse (GstHarness * h,
     const gchar * launchline, gboolean has_clock_wait)
 {
-  if (h->src_harness)
-    gst_harness_teardown (h->src_harness);
-  h->src_harness = gst_harness_new_parse (launchline);
-  gst_harness_setup_src_harness (h, has_clock_wait);
+  GstHarness *src_harness = gst_harness_new_parse (launchline);
+  gst_harness_add_src_harness (h, src_harness, has_clock_wait);
 }
 
 /**
@@ -1987,6 +2172,7 @@ GstFlowReturn
 gst_harness_push_from_src (GstHarness * h)
 {
   GstBuffer *buf;
+  gboolean crank;
 
   g_assert (h->src_harness);
 
@@ -1995,10 +2181,12 @@ gst_harness_push_from_src (GstHarness * h)
   gst_harness_play (h->src_harness);
 
   if (h->src_harness->priv->has_clock_wait) {
-    g_assert (gst_harness_crank_single_clock_wait (h->src_harness));
+    crank = gst_harness_crank_single_clock_wait (h->src_harness);
+    g_assert (crank);
   }
 
-  g_assert ((buf = gst_harness_pull (h->src_harness)) != NULL);
+  buf = gst_harness_pull (h->src_harness);
+  g_assert (buf != NULL);
   return gst_harness_push (h, buf);
 }
 
@@ -2024,16 +2212,21 @@ GstFlowReturn
 gst_harness_src_crank_and_push_many (GstHarness * h, gint cranks, gint pushes)
 {
   GstFlowReturn ret = GST_FLOW_OK;
+  gboolean crank;
+  int i;
 
   g_assert (h->src_harness);
   gst_harness_play (h->src_harness);
 
-  for (int i = 0; i < cranks; i++)
-    g_assert (gst_harness_crank_single_clock_wait (h->src_harness));
+  for (i = 0; i < cranks; i++) {
+    crank = gst_harness_crank_single_clock_wait (h->src_harness);
+    g_assert (crank);
+  }
 
-  for (int i = 0; i < pushes; i++) {
+  for (i = 0; i < pushes; i++) {
     GstBuffer *buf;
-    g_assert ((buf = gst_harness_pull (h->src_harness)) != NULL);
+    buf = gst_harness_pull (h->src_harness);
+    g_assert (buf != NULL);
     ret = gst_harness_push (h, buf);
     if (ret != GST_FLOW_OK)
       break;
@@ -2063,10 +2256,18 @@ gst_harness_src_push_event (GstHarness * h)
   return gst_harness_push_event (h, gst_harness_pull_event (h->src_harness));
 }
 
+
+static gboolean
+forward_sticky_events (GstPad * pad, GstEvent ** ev, gpointer user_data)
+{
+  GstHarness *h = user_data;
+  return gst_pad_push_event (h->priv->sink_forward_pad, gst_event_ref (*ev));
+}
+
 /**
- * gst_harness_add_sink:
+ * gst_harness_add_sink_harness:
  * @h: a #GstHarness
- * @sink_element_name: a #gchar with the name of a #GstElement
+ * @sink_harness: (transfer full): a #GstHarness to be added as a sink-harness.
  *
  * Similar to gst_harness_add_src, this allows you to send the data coming out
  * of your harnessed #GstElement to a sink-element, allowing to test different
@@ -2082,16 +2283,38 @@ gst_harness_src_push_event (GstHarness * h)
  * Since: 1.6
  */
 void
-gst_harness_add_sink (GstHarness * h, const gchar * sink_element_name)
+gst_harness_add_sink_harness (GstHarness * h, GstHarness * sink_harness)
 {
   GstHarnessPrivate *priv = h->priv;
 
   if (h->sink_harness) {
+    gst_harness_set_forward_pad (h, NULL);
     gst_harness_teardown (h->sink_harness);
-    gst_object_unref (priv->sink_forward_pad);
   }
-  h->sink_harness = gst_harness_new (sink_element_name);
-  priv->sink_forward_pad = gst_object_ref (h->sink_harness->srcpad);
+  h->sink_harness = sink_harness;
+  gst_harness_set_forward_pad (h, h->sink_harness->srcpad);
+  if (priv->forwarding && h->sinkpad)
+    gst_pad_sticky_events_foreach (h->sinkpad, forward_sticky_events, h);
+  gst_harness_set_forwarding (h->sink_harness, priv->forwarding);
+}
+
+/**
+ * gst_harness_add_sink:
+ * @h: a #GstHarness
+ * @sink_element_name: a #gchar with the name of a #GstElement
+ *
+ * Similar to gst_harness_add_sink_harness, this is a convenience to
+ * directly create a sink-harness using the @sink_element_name name specified.
+ *
+ * MT safe.
+ *
+ * Since: 1.6
+ */
+void
+gst_harness_add_sink (GstHarness * h, const gchar * sink_element_name)
+{
+  GstHarness *sink_harness = gst_harness_new (sink_element_name);
+  gst_harness_add_sink_harness (h, sink_harness);
 }
 
 /**
@@ -2109,14 +2332,8 @@ gst_harness_add_sink (GstHarness * h, const gchar * sink_element_name)
 void
 gst_harness_add_sink_parse (GstHarness * h, const gchar * launchline)
 {
-  GstHarnessPrivate *priv = h->priv;
-
-  if (h->sink_harness) {
-    gst_harness_teardown (h->sink_harness);
-    gst_object_unref (priv->sink_forward_pad);
-  }
-  h->sink_harness = gst_harness_new_parse (launchline);
-  priv->sink_forward_pad = gst_object_ref (h->sink_harness->srcpad);
+  GstHarness *sink_harness = gst_harness_new_parse (launchline);
+  gst_harness_add_sink_harness (h, sink_harness);
 }
 
 /**
@@ -2137,7 +2354,8 @@ gst_harness_push_to_sink (GstHarness * h)
 {
   GstBuffer *buf;
   g_assert (h->sink_harness);
-  g_assert ((buf = gst_harness_pull (h)) != NULL);
+  buf = gst_harness_pull (h);
+  g_assert (buf != NULL);
   return gst_harness_push (h->sink_harness, buf);
 }
 
@@ -2159,8 +2377,9 @@ GstFlowReturn
 gst_harness_sink_push_many (GstHarness * h, gint pushes)
 {
   GstFlowReturn ret = GST_FLOW_OK;
+  int i;
   g_assert (h->sink_harness);
-  for (int i = 0; i < pushes; i++) {
+  for (i = 0; i < pushes; i++) {
     ret = gst_harness_push_to_sink (h);
     if (ret != GST_FLOW_OK)
       break;
@@ -2338,7 +2557,7 @@ typedef struct
 
   GstCaps *caps;
   GstSegment segment;
-  GstHarnessPrepareBuffer func;
+  GstHarnessPrepareBufferFunc func;
   gpointer data;
   GDestroyNotify notify;
 } GstHarnessPushBufferThread;
@@ -2347,7 +2566,9 @@ typedef struct
 {
   GstHarnessThread t;
 
-  GstEvent *event;
+  GstHarnessPrepareEventFunc func;
+  gpointer data;
+  GDestroyNotify notify;
 } GstHarnessPushEventThread;
 
 typedef struct
@@ -2408,7 +2629,8 @@ static void
 gst_harness_push_event_thread_free (GstHarnessPushEventThread * t)
 {
   if (t != NULL) {
-    gst_event_replace (&t->event, NULL);
+    if (t->notify != NULL)
+      t->notify (t->data);
     g_slice_free (GstHarnessPushEventThread, t);
   }
 }
@@ -2461,22 +2683,6 @@ gst_harness_requestpad_thread_free (GstHarnessReqPadThread * t)
    (t->running = FALSE,                                                        \
    GPOINTER_TO_UINT (g_thread_join (t->thread)))
 
-#define GST_HARNESS_STRESS_FUNC_BEGIN(ID, INIT)                                \
-  static gpointer                                                              \
-  gst_harness_stress_##ID##_func (GstHarnessThread * t)                        \
-  {                                                                            \
-    guint count = 0;                                                           \
-    INIT;                                                                      \
-                                                                               \
-    while (t->running) {
-
-#define GST_HARNESS_STRESS_FUNC_END()                                          \
-      count++;                                                                 \
-      g_usleep (t->sleep);                                                     \
-    }                                                                          \
-    return GUINT_TO_POINTER (count);                                           \
-  }
-
 static void
 gst_harness_stress_free (GstHarnessThread * t)
 {
@@ -2484,127 +2690,187 @@ gst_harness_stress_free (GstHarnessThread * t)
     t->freefunc (t);
 }
 
-GST_HARNESS_STRESS_FUNC_BEGIN (custom, {
-    GstHarnessCustomThread *ct = (GstHarnessCustomThread *) t;
-    ct->init (ct, ct->data);
-  }
-)
+static gpointer
+gst_harness_stress_custom_func (GstHarnessThread * t)
 {
   GstHarnessCustomThread *ct = (GstHarnessCustomThread *) t;
-  ct->callback (ct, ct->data);
-}
-GST_HARNESS_STRESS_FUNC_END ()
+  guint count = 0;
 
-GST_HARNESS_STRESS_FUNC_BEGIN (statechange, {})
-{
-  GstClock *clock = gst_element_get_clock (t->h->element);
-  GstIterator *it;
-  gboolean done = FALSE;
+  if (ct->init != NULL)
+    ct->init (ct, ct->data);
 
-  g_assert (gst_element_set_state (t->h->element, GST_STATE_NULL) ==
-      GST_STATE_CHANGE_SUCCESS);
-  g_thread_yield ();
+  while (t->running) {
+    ct->callback (ct, ct->data);
 
-  it = gst_element_iterate_sink_pads (t->h->element);
-  while (!done) {
-    GValue item = G_VALUE_INIT;
-    switch (gst_iterator_next (it, &item)) {
-      case GST_ITERATOR_OK:
-      {
-        GstPad *sinkpad = g_value_get_object (&item);
-        GstPad *srcpad = gst_pad_get_peer (sinkpad);
-        if (srcpad != NULL) {
-          gst_pad_unlink (srcpad, sinkpad);
-          gst_pad_link (srcpad, sinkpad);
-          gst_object_unref (srcpad);
+    count++;
+    g_usleep (t->sleep);
+  }
+  return GUINT_TO_POINTER (count);
+}
+
+
+static gpointer
+gst_harness_stress_statechange_func (GstHarnessThread * t)
+{
+  guint count = 0;
+
+  while (t->running) {
+    GstClock *clock = gst_element_get_clock (t->h->element);
+    GstIterator *it;
+    gboolean done = FALSE;
+    gboolean change;
+
+    change = gst_element_set_state (t->h->element, GST_STATE_NULL);
+    g_assert (change == GST_STATE_CHANGE_SUCCESS);
+    g_thread_yield ();
+
+    it = gst_element_iterate_sink_pads (t->h->element);
+    while (!done) {
+      GValue item = G_VALUE_INIT;
+      switch (gst_iterator_next (it, &item)) {
+        case GST_ITERATOR_OK:
+        {
+          GstPad *sinkpad = g_value_get_object (&item);
+          GstPad *srcpad = gst_pad_get_peer (sinkpad);
+          if (srcpad != NULL) {
+            gst_pad_unlink (srcpad, sinkpad);
+            gst_pad_link (srcpad, sinkpad);
+            gst_object_unref (srcpad);
+          }
+          g_value_reset (&item);
+          break;
         }
-        g_value_reset (&item);
-        break;
+        case GST_ITERATOR_RESYNC:
+          gst_iterator_resync (it);
+          break;
+        case GST_ITERATOR_ERROR:
+          g_assert_not_reached ();
+        case GST_ITERATOR_DONE:
+          done = TRUE;
+          break;
       }
-      case GST_ITERATOR_RESYNC:
-        gst_iterator_resync (it);
-        break;
-      case GST_ITERATOR_ERROR:
-        g_assert_not_reached ();
-      case GST_ITERATOR_DONE:
-        done = TRUE;
-        break;
+      g_value_unset (&item);
     }
-    g_value_unset (&item);
-  }
-  gst_iterator_free (it);
+    gst_iterator_free (it);
 
-  if (clock != NULL) {
-    gst_element_set_clock (t->h->element, clock);
-    gst_object_unref (clock);
-  }
-  g_assert (gst_element_set_state (t->h->element, GST_STATE_PLAYING) ==
-      GST_STATE_CHANGE_SUCCESS);
-}
-GST_HARNESS_STRESS_FUNC_END ()
-
-GST_HARNESS_STRESS_FUNC_BEGIN (buffer, {
-    GstHarnessPushBufferThread *pt = (GstHarnessPushBufferThread *) t;
-    gchar *sid;
-    /* Push stream start, caps and segment events */
-    sid = g_strdup_printf ("%s-%p", GST_OBJECT_NAME (t->h->element), t->h);
-    g_assert (gst_pad_push_event (t->h->srcpad,
-        gst_event_new_stream_start (sid))); g_free (sid);
-    g_assert (gst_pad_push_event (t->h->srcpad,
-        gst_event_new_caps (pt->caps)));
-    g_assert (gst_pad_push_event (t->h->srcpad,
-        gst_event_new_segment (&pt->segment)));
+    if (clock != NULL) {
+      gst_element_set_clock (t->h->element, clock);
+      gst_object_unref (clock);
+    }
+    change = gst_element_set_state (t->h->element, GST_STATE_PLAYING);
+    g_assert (change == GST_STATE_CHANGE_SUCCESS);
+
+    count++;
+    g_usleep (t->sleep);
   }
-)
+  return GUINT_TO_POINTER (count);
+}
+
+static gpointer
+gst_harness_stress_buffer_func (GstHarnessThread * t)
 {
   GstHarnessPushBufferThread *pt = (GstHarnessPushBufferThread *) t;
-  gst_harness_push (t->h, pt->func (t->h, pt->data));
+  guint count = 0;
+  gchar *sid;
+  gboolean handled;
+
+  /* Push stream start, caps and segment events */
+  sid = g_strdup_printf ("%s-%p", GST_OBJECT_NAME (t->h->element), t->h);
+  handled = gst_pad_push_event (t->h->srcpad, gst_event_new_stream_start (sid));
+  g_assert (handled);
+  g_free (sid);
+  handled = gst_pad_push_event (t->h->srcpad, gst_event_new_caps (pt->caps));
+  g_assert (handled);
+  handled = gst_pad_push_event (t->h->srcpad,
+      gst_event_new_segment (&pt->segment));
+  g_assert (handled);
+
+  while (t->running) {
+    gst_harness_push (t->h, pt->func (t->h, pt->data));
+
+    count++;
+    g_usleep (t->sleep);
+  }
+  return GUINT_TO_POINTER (count);
 }
-GST_HARNESS_STRESS_FUNC_END ()
 
-GST_HARNESS_STRESS_FUNC_BEGIN (event, {})
+static gpointer
+gst_harness_stress_event_func (GstHarnessThread * t)
 {
   GstHarnessPushEventThread *pet = (GstHarnessPushEventThread *) t;
-  gst_harness_push_event (t->h, gst_event_ref (pet->event));
+  guint count = 0;
+
+  while (t->running) {
+    gst_harness_push_event (t->h, pet->func (t->h, pet->data));
+
+    count++;
+    g_usleep (t->sleep);
+  }
+  return GUINT_TO_POINTER (count);
 }
-GST_HARNESS_STRESS_FUNC_END ()
 
-GST_HARNESS_STRESS_FUNC_BEGIN (upstream_event, {})
+static gpointer
+gst_harness_stress_upstream_event_func (GstHarnessThread * t)
 {
   GstHarnessPushEventThread *pet = (GstHarnessPushEventThread *) t;
-  gst_harness_push_upstream_event (t->h, gst_event_ref (pet->event));
+  guint count = 0;
+
+  while (t->running) {
+    gst_harness_push_upstream_event (t->h, pet->func (t->h, pet->data));
+
+    count++;
+    g_usleep (t->sleep);
+  }
+  return GUINT_TO_POINTER (count);
 }
-GST_HARNESS_STRESS_FUNC_END ()
 
-GST_HARNESS_STRESS_FUNC_BEGIN (property, {})
+static gpointer
+gst_harness_stress_property_func (GstHarnessThread * t)
 {
   GstHarnessPropThread *pt = (GstHarnessPropThread *) t;
-  GValue value = G_VALUE_INIT;
+  guint count = 0;
+
+  while (t->running) {
+    GValue value = G_VALUE_INIT;
 
-  g_object_set_property (G_OBJECT (t->h->element), pt->name, &pt->value);
+    g_object_set_property (G_OBJECT (t->h->element), pt->name, &pt->value);
 
-  g_value_init (&value, G_VALUE_TYPE (&pt->value));
-  g_object_get_property (G_OBJECT (t->h->element), pt->name, &value);
-  g_value_reset (&value);
+    g_value_init (&value, G_VALUE_TYPE (&pt->value));
+    g_object_get_property (G_OBJECT (t->h->element), pt->name, &value);
+    g_value_reset (&value);
+
+    count++;
+    g_usleep (t->sleep);
+  }
+  return GUINT_TO_POINTER (count);
 }
-GST_HARNESS_STRESS_FUNC_END ()
 
-GST_HARNESS_STRESS_FUNC_BEGIN (requestpad, {})
+static gpointer
+gst_harness_stress_requestpad_func (GstHarnessThread * t)
 {
   GstHarnessReqPadThread *rpt = (GstHarnessReqPadThread *) t;
-  GstPad *reqpad;
+  guint count = 0;
 
-  if (rpt->release)
-    gst_harness_requestpad_release_pads (rpt);
-  g_thread_yield ();
+  while (t->running) {
+    GstPad *reqpad;
 
-  reqpad = gst_element_request_pad (t->h->element,
-      rpt->templ, rpt->name, rpt->caps);
-  g_assert (reqpad != NULL);
+    if (rpt->release)
+      gst_harness_requestpad_release_pads (rpt);
 
-  rpt->pads = g_slist_prepend (rpt->pads, reqpad);
+    g_thread_yield ();
+
+    reqpad = gst_element_request_pad (t->h->element,
+        rpt->templ, rpt->name, rpt->caps);
+
+    g_assert (reqpad != NULL);
+
+    rpt->pads = g_slist_prepend (rpt->pads, reqpad);
+
+    count++;
+    g_usleep (t->sleep);
+  }
+  return GUINT_TO_POINTER (count);
 }
-GST_HARNESS_STRESS_FUNC_END ()
 
 /**
  * gst_harness_stress_thread_stop:
@@ -2631,7 +2897,7 @@ gst_harness_stress_thread_stop (GstHarnessThread * t)
 /**
  * gst_harness_stress_custom_start: (skip)
  * @h: a #GstHarness
- * @init: a #GFunc that is called initially and only once
+ * @init: (allow-none): a #GFunc that is called initially and only once
  * @callback: a #GFunc that is called as often as possible
  * @data: a #gpointer with custom data to pass to the @callback function
  * @sleep: a #gulong specifying how long to sleep in (microseconds) for
@@ -2694,6 +2960,13 @@ gst_harness_ref_buffer (GstHarness * h, gpointer data)
   return gst_buffer_ref (GST_BUFFER_CAST (data));
 }
 
+static GstEvent *
+gst_harness_ref_event (GstHarness * h, gpointer data)
+{
+  (void) h;
+  return gst_event_ref (GST_EVENT_CAST (data));
+}
+
 /**
  * gst_harness_stress_push_buffer_start_full: (skip)
  * @h: a #GstHarness
@@ -2725,15 +2998,14 @@ gst_harness_stress_push_buffer_start_full (GstHarness * h,
  * @h: a #GstHarness
  * @caps: a #GstCaps for the #GstBuffer
  * @segment: a #GstSegment
- * @func: a #GstHarnessPrepareBuffer function called before every iteration
+ * @func: a #GstHarnessPrepareBufferFunc function called before every iteration
  * to prepare / create a #GstBuffer for pushing
- * @data: a #gpointer with data to the #GstHarnessPrepareBuffer function
- * @notify: a #GDestroyNotify that is called for every push to allow cleaning
- * up the #GstBuffer. (like gst_buffer_unref)
+ * @data: a #gpointer with data to the #GstHarnessPrepareBufferFunc function
+ * @notify: a #GDestroyNotify that is called when thread is stopped
  * @sleep: a #gulong specifying how long to sleep in (microseconds) for
  * each call to gst_pad_push
  *
- * Push a #GstBuffer in intervals of @sleep microseconds.
+ * Push a #GstBuffer returned by @func in intervals of @sleep microseconds.
  *
  * MT safe.
  *
@@ -2744,7 +3016,7 @@ gst_harness_stress_push_buffer_start_full (GstHarness * h,
 GstHarnessThread *
 gst_harness_stress_push_buffer_with_cb_start_full (GstHarness * h,
     GstCaps * caps, const GstSegment * segment,
-    GstHarnessPrepareBuffer func, gpointer data, GDestroyNotify notify,
+    GstHarnessPrepareBufferFunc func, gpointer data, GDestroyNotify notify,
     gulong sleep)
 {
   GstHarnessPushBufferThread *t = g_slice_new0 (GstHarnessPushBufferThread);
@@ -2781,11 +3053,43 @@ GstHarnessThread *
 gst_harness_stress_push_event_start_full (GstHarness * h,
     GstEvent * event, gulong sleep)
 {
+  return gst_harness_stress_push_event_with_cb_start_full (h,
+      gst_harness_ref_event, gst_event_ref (event),
+      (GDestroyNotify) gst_event_unref, sleep);
+}
+
+/**
+ * gst_harness_stress_push_event_with_cb_start_full: (skip)
+ * @h: a #GstHarness
+ * @func: a #GstHarnessPrepareEventFunc function called before every iteration
+ * to prepare / create a #GstEvent for pushing
+ * @data: a #gpointer with data to the #GstHarnessPrepareEventFunc function
+ * @notify: a #GDestroyNotify that is called when thread is stopped
+ * @sleep: a #gulong specifying how long to sleep in (microseconds) for
+ * each call to gst_pad_push
+ *
+ * Push a #GstEvent returned by @func onto the harnessed #GstElement sinkpad
+ * in intervals of @sleep microseconds.
+ *
+ * MT safe.
+ *
+ * Returns: a #GstHarnessThread
+ *
+ * Since: 1.8
+ */
+GstHarnessThread *
+gst_harness_stress_push_event_with_cb_start_full (GstHarness * h,
+    GstHarnessPrepareEventFunc func, gpointer data, GDestroyNotify notify,
+    gulong sleep)
+{
   GstHarnessPushEventThread *t = g_slice_new0 (GstHarnessPushEventThread);
   gst_harness_thread_init (&t->t,
       (GDestroyNotify) gst_harness_push_event_thread_free, h, sleep);
 
-  t->event = gst_event_ref (event);
+  t->func = func;
+  t->data = data;
+  t->notify = notify;
+
   GST_HARNESS_THREAD_START (event, t);
   return &t->t;
 }
@@ -2799,9 +3103,6 @@ gst_harness_stress_push_event_start_full (GstHarness * h,
  *
  * Push the @event onto the harnessed #GstElement srcpad in intervals of
  * @sleep microseconds.
- * Pushing events should generally be OOB events.
- * If you need serialized events, you may use a custom stress thread which
- * both pushes buffers and events.
  *
  * MT safe.
  *
@@ -2813,11 +3114,43 @@ GstHarnessThread *
 gst_harness_stress_push_upstream_event_start_full (GstHarness * h,
     GstEvent * event, gulong sleep)
 {
+  return gst_harness_stress_push_upstream_event_with_cb_start_full (h,
+      gst_harness_ref_event, gst_event_ref (event),
+      (GDestroyNotify) gst_event_unref, sleep);
+}
+
+/**
+ * gst_harness_stress_push_upstream_event_with_cb_start_full: (skip)
+ * @h: a #GstHarness
+ * @func: a #GstHarnessPrepareEventFunc function called before every iteration
+ * to prepare / create a #GstEvent for pushing
+ * @data: a #gpointer with data to the #GstHarnessPrepareEventFunc function
+ * @notify: a #GDestroyNotify that is called when thread is stopped
+ * @sleep: a #gulong specifying how long to sleep in (microseconds) for
+ * each call to gst_pad_push
+ *
+ * Push a #GstEvent returned by @func onto the harnessed #GstElement srcpad
+ * in intervals of @sleep microseconds.
+ *
+ * MT safe.
+ *
+ * Returns: a #GstHarnessThread
+ *
+ * Since: 1.8
+ */
+GstHarnessThread *
+gst_harness_stress_push_upstream_event_with_cb_start_full (GstHarness * h,
+    GstHarnessPrepareEventFunc func, gpointer data, GDestroyNotify notify,
+    gulong sleep)
+{
   GstHarnessPushEventThread *t = g_slice_new0 (GstHarnessPushEventThread);
   gst_harness_thread_init (&t->t,
       (GDestroyNotify) gst_harness_push_event_thread_free, h, sleep);
 
-  t->event = gst_event_ref (event);
+  t->func = func;
+  t->data = data;
+  t->notify = notify;
+
   GST_HARNESS_THREAD_START (upstream_event, t);
   return &t->t;
 }