appsink: add API to catch events
authorGuillaume Desmottes <guillaume.desmottes@collabora.com>
Fri, 19 Feb 2021 12:32:48 +0000 (13:32 +0100)
committerGuillaume Desmottes <guillaume.desmottes@collabora.com>
Thu, 22 Jul 2021 11:56:22 +0000 (13:56 +0200)
There is currently no way for users to receive incoming events from
appsink while keeping them properly serialized with the buffers flow.
This can be especially useful when application is injecting custom
downstream events into the pipeline and needs to know when they reached
appsink.

Solving this by adding a new signal notifying about new incoming events
and a set of action signals and method to pull those events.
The API is actually pulling the samples and events all together as they
are actually fetched from the same queue.
Having a specific API to pull only events would have the side effect of
discarding samples (and pulling samples would discard events) making
this API not convenient for users.

Partially fix #247

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1046>

docs/plugins/gst_plugins_cache.json
gst-libs/gst/app/gstappsink.c
gst-libs/gst/app/gstappsink.h
tests/check/elements/appsink.c

index 3cf3090..73789d3 100644 (file)
                         "return-type": "GstFlowReturn",
                         "when": "last"
                     },
+                    "new-serialized-event": {
+                        "args": [],
+                        "return-type": "gboolean",
+                        "when": "last"
+                    },
                     "pull-preroll": {
                         "action": true,
                         "args": [],
                         "return-type": "GstSample",
                         "when": "last"
                     },
+                    "try-pull-object": {
+                        "action": true,
+                        "args": [
+                            {
+                                "name": "arg0",
+                                "type": "guint64"
+                            }
+                        ],
+                        "return-type": "GstMiniObject",
+                        "when": "last"
+                    },
                     "try-pull-preroll": {
                         "action": true,
                         "args": [
index 65822c8..16da11c 100644 (file)
@@ -113,6 +113,7 @@ struct _GstAppSinkPrivate
   GstCaps *caps;
   gboolean emit_signals;
   guint num_buffers;
+  guint num_events;
   guint max_buffers;
   gboolean drop;
   gboolean wait_on_eos;
@@ -146,12 +147,14 @@ enum
   SIGNAL_EOS,
   SIGNAL_NEW_PREROLL,
   SIGNAL_NEW_SAMPLE,
+  SIGNAL_NEW_SERIALIZED_EVENT,
 
   /* actions */
   SIGNAL_PULL_PREROLL,
   SIGNAL_PULL_SAMPLE,
   SIGNAL_TRY_PULL_PREROLL,
   SIGNAL_TRY_PULL_SAMPLE,
+  SIGNAL_TRY_PULL_OBJECT,
 
   LAST_SIGNAL
 };
@@ -333,6 +336,34 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
       NULL, NULL, NULL, GST_TYPE_FLOW_RETURN, 0, G_TYPE_NONE);
 
   /**
+   * GstAppSink::new-serialized-event:
+   * @appsink: the appsink element that emitted the signal
+   *
+   * Signal that a new downstream serialized event is available.
+   *
+   * This signal is emitted from the streaming thread and only when the
+   * "emit-signals" property is %TRUE.
+   *
+   * The new event can be retrieved with the "try-pull-object" action
+   * signal or gst_app_sink_pull_object() either from this signal callback
+   * or from any other thread.
+   *
+   * EOS will not be notified using this signal, use #GstAppSink::eos instead.
+   * EOS cannot be pulled either, use gst_app_sink_is_eos() to check for it.
+   *
+   * Note that this signal is only emitted when the "emit-signals" property is
+   * set to %TRUE, which it is not by default for performance reasons.
+   *
+   * The callback should return %TRUE if the event has been handled, which will
+   * skip basesink handling of the event, %FALSE otherwise.
+   *
+   * Since: 1.20
+   */
+  gst_app_sink_signals[SIGNAL_NEW_SERIALIZED_EVENT] =
+      g_signal_new ("new-serialized-event", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_BOOLEAN, 0, G_TYPE_NONE);
+
+  /**
    * GstAppSink::pull-preroll:
    * @appsink: the appsink element to emit this signal on
    *
@@ -386,6 +417,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
       g_signal_new ("pull-sample", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSinkClass,
           pull_sample), NULL, NULL, NULL, GST_TYPE_SAMPLE, 0, G_TYPE_NONE);
+
   /**
    * GstAppSink::try-pull-preroll:
    * @appsink: the appsink element to emit this signal on
@@ -451,6 +483,44 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
       G_STRUCT_OFFSET (GstAppSinkClass, try_pull_sample), NULL, NULL, NULL,
       GST_TYPE_SAMPLE, 1, GST_TYPE_CLOCK_TIME);
 
+  /**
+   * GstAppSink::try-pull-object:
+   * @appsink: the appsink element to emit this signal on
+   * @timeout: the maximum amount of time to wait for a sample
+   *
+   * This function blocks until a sample or an event becomes available or the appsink
+   * element is set to the READY/NULL state or the timeout expires.
+   *
+   * This function will only return samples when the appsink is in the PLAYING
+   * state. All rendered samples and events will be put in a queue so that the application
+   * can pull them at its own rate.
+   * Events can be pulled when the appsink is in the READY, PAUSED or PLAYING state.
+   *
+   * Note that when the application does not pull samples fast enough, the
+   * queued samples could consume a lot of memory, especially when dealing with
+   * raw video frames. It's possible to control the behaviour of the queue with
+   * the "drop" and "max-buffers" properties.
+   *
+   * This function will only pull serialized events, excluding
+   * the EOS event for which this functions returns
+   * %NULL. Use gst_app_sink_is_eos() to check for the EOS condition.
+   *
+   * This signal is a variant of #GstAppSink::try-pull-sample: that can be used
+   * to handle incoming events as well as samples.
+   *
+   * Note that future releases may extend this API to return other object types
+   * so make sure that your code is checking for the actual type it is handling.
+   *
+   * Returns: (transfer full): a #GstSample or a #GstEvent or NULL when the appsink is stopped or EOS or the timeout expires.
+   *
+   * Since: 1.20
+   */
+  gst_app_sink_signals[SIGNAL_TRY_PULL_OBJECT] =
+      g_signal_new ("try-pull-object", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (GstAppSinkClass, try_pull_object), NULL, NULL, NULL,
+      GST_TYPE_MINI_OBJECT, 1, GST_TYPE_CLOCK_TIME);
+
   gst_element_class_set_static_metadata (element_class, "AppSink",
       "Generic/Sink", "Allow the application to get access to raw buffer",
       "David Schleef <ds@schleef.org>, Wim Taymans <wim.taymans@gmail.com>");
@@ -474,6 +544,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
   klass->pull_sample = gst_app_sink_pull_sample;
   klass->try_pull_preroll = gst_app_sink_try_pull_preroll;
   klass->try_pull_sample = gst_app_sink_try_pull_sample;
+  klass->try_pull_object = gst_app_sink_try_pull_object;
 }
 
 static void
@@ -659,6 +730,7 @@ gst_app_sink_flush_unlocked (GstAppSink * appsink)
   while ((obj = gst_queue_array_pop_head (priv->queue)))
     gst_mini_object_unref (obj);
   priv->num_buffers = 0;
+  priv->num_events = 0;
   g_cond_signal (&priv->cond);
 }
 
@@ -716,6 +788,7 @@ gst_app_sink_setcaps (GstBaseSink * sink, GstCaps * caps)
   g_mutex_lock (&priv->mutex);
   GST_DEBUG_OBJECT (appsink, "receiving CAPS");
   gst_queue_array_push_tail (priv->queue, gst_event_new_caps (caps));
+  priv->num_events++;
   if (!priv->preroll_buffer)
     gst_caps_replace (&priv->preroll_caps, caps);
   g_mutex_unlock (&priv->mutex);
@@ -729,11 +802,12 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
   GstAppSink *appsink = GST_APP_SINK_CAST (sink);
   GstAppSinkPrivate *priv = appsink->priv;
 
+  GST_DEBUG_OBJECT (appsink, "%" GST_PTR_FORMAT, event);
+
   switch (event->type) {
     case GST_EVENT_SEGMENT:
       g_mutex_lock (&priv->mutex);
       GST_DEBUG_OBJECT (appsink, "receiving SEGMENT");
-      gst_queue_array_push_tail (priv->queue, gst_event_ref (event));
       if (!priv->preroll_buffer)
         gst_event_copy_segment (event, &priv->preroll_segment);
       g_mutex_unlock (&priv->mutex);
@@ -805,6 +879,40 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
     default:
       break;
   }
+
+  if (GST_EVENT_TYPE (event) != GST_EVENT_EOS
+      && GST_EVENT_IS_SERIALIZED (event)) {
+    gboolean emit;
+    Callbacks *callbacks = NULL;
+    gboolean ret;
+
+    g_mutex_lock (&priv->mutex);
+
+    emit = priv->emit_signals;
+    if (priv->callbacks)
+      callbacks = callbacks_ref (priv->callbacks);
+
+    gst_queue_array_push_tail (priv->queue, gst_event_ref (event));
+    priv->num_events++;
+
+    g_mutex_unlock (&priv->mutex);
+
+    if (callbacks && callbacks->callbacks.new_event) {
+      ret = callbacks->callbacks.new_event (appsink, callbacks->user_data);
+    } else {
+      ret = FALSE;
+      if (emit)
+        g_signal_emit (appsink,
+            gst_app_sink_signals[SIGNAL_NEW_SERIALIZED_EVENT], 0, &ret);
+    }
+    g_clear_pointer (&callbacks, callbacks_unref);
+
+    if (ret) {
+      gst_event_unref (event);
+      return TRUE;
+    }
+  }
+
   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
 }
 
@@ -867,6 +975,8 @@ dequeue_object (GstAppSink * appsink)
   } else if (GST_IS_EVENT (obj)) {
     GstEvent *event = GST_EVENT_CAST (obj);
 
+    priv->num_events--;
+
     switch (GST_EVENT_TYPE (obj)) {
       case GST_EVENT_CAPS:
       {
@@ -1548,6 +1658,41 @@ gst_app_sink_pull_sample (GstAppSink * appsink)
 }
 
 /**
+ * gst_app_sink_pull_object: (skip)
+ * @appsink: a #GstAppSink
+ *
+ * This function blocks until a sample or an event becomes available or the appsink
+ * element is set to the READY/NULL state.
+ *
+ * This function will only return samples when the appsink is in the PLAYING
+ * state. All rendered buffers and events will be put in a queue so that the application
+ * can pull them at its own rate. Note that when the application does not
+ * pull samples fast enough, the queued buffers could consume a lot of memory,
+ * especially when dealing with raw video frames.
+ * Events can be pulled when the appsink is in the READY, PAUSED or PLAYING state.
+ *
+ * This function will only pull serialized events, excluding
+ * the EOS event for which this functions returns
+ * %NULL. Use gst_app_sink_is_eos() to check for the EOS condition.
+ *
+ * This method is a variant of gst_app_sink_pull_sample() that can be used
+ * to handle incoming events events as well as samples.
+ *
+ * Note that future releases may extend this API to return other object types
+ * so make sure that your code is checking for the actual type it is handling.
+ *
+ * Returns: (transfer full): a #GstSample, or a #GstEvent or NULL when the appsink is stopped or EOS.
+ *          Call gst_mini_object_unref() after usage.
+ *
+ * Since: 1.20
+ */
+GstMiniObject *
+gst_app_sink_pull_object (GstAppSink * appsink)
+{
+  return gst_app_sink_try_pull_object (appsink, GST_CLOCK_TIME_NONE);
+}
+
+/**
  * gst_app_sink_try_pull_preroll:
  * @appsink: a #GstAppSink
  * @timeout: the maximum amount of time to wait for the preroll sample
@@ -1676,9 +1821,56 @@ not_started:
 GstSample *
 gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
 {
+  while (TRUE) {
+    GstMiniObject *obj;
+
+    obj = gst_app_sink_try_pull_object (appsink, timeout);
+
+    if (!obj) {
+      return NULL;
+    } else if (GST_IS_SAMPLE (obj)) {
+      return GST_SAMPLE_CAST (obj);
+    } else {
+      gst_mini_object_unref (obj);
+    }
+  }
+}
+
+/**
+ * gst_app_sink_try_pull_object: (skip)
+ * @appsink: a #GstAppSink
+ * @timeout: the maximum amount of time to wait for a sample
+ *
+ * This function blocks until a sample or an event or EOS becomes available or the appsink
+ * element is set to the READY/NULL state or the timeout expires.
+ *
+ * This function will only return samples when the appsink is in the PLAYING
+ * state. All rendered buffers and events will be put in a queue so that the application
+ * can pull them at its own rate. Note that when the application does not
+ * pull samples fast enough, the queued buffers could consume a lot of memory,
+ * especially when dealing with raw video frames.
+ * Events can be pulled when the appsink is in the READY, PAUSED or PLAYING state.
+ *
+ * This function will only pull serialized events, excluding
+ * the EOS event for which this functions returns
+ * %NULL. Use gst_app_sink_is_eos() to check for the EOS condition.
+ *
+ * This method is a variant of gst_app_sink_try_pull_sample() that can be used
+ * to handle incoming events events as well as samples.
+ *
+ * Note that future releases may extend this API to return other object types
+ * so make sure that your code is checking for the actual type it is handling.
+ *
+ * Returns: (transfer full): a #GstSample, or #GstEvent or NULL when the appsink is stopped or EOS or the timeout expires.
+ * Call gst_mini_object_unref() after usage.
+ *
+ * Since: 1.20
+ */
+GstMiniObject *
+gst_app_sink_try_pull_object (GstAppSink * appsink, GstClockTime timeout)
+{
   GstAppSinkPrivate *priv;
-  GstSample *sample = NULL;
-  GstMiniObject *obj;
+  GstMiniObject *obj = NULL, *ret;
   gboolean timeout_valid;
   gint64 end_time;
 
@@ -1696,18 +1888,18 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
   gst_buffer_replace (&priv->preroll_buffer, NULL);
 
   while (TRUE) {
-    GST_DEBUG_OBJECT (appsink, "trying to grab a buffer");
+    GST_DEBUG_OBJECT (appsink, "trying to grab an object");
     if (!priv->started)
       goto not_started;
 
-    if (priv->num_buffers > 0)
+    if (priv->num_buffers > 0 || priv->num_events > 0)
       break;
 
     if (priv->is_eos)
       goto eos;
 
     /* nothing to return, wait */
-    GST_DEBUG_OBJECT (appsink, "waiting for a buffer");
+    GST_DEBUG_OBJECT (appsink, "waiting for an object");
     priv->wait_status |= APP_WAITING;
     if (timeout_valid) {
       if (!g_cond_wait_until (&priv->cond, &priv->mutex, end_time))
@@ -1718,28 +1910,33 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
     priv->wait_status &= ~APP_WAITING;
   }
 
-  obj = dequeue_buffer (appsink);
+  obj = dequeue_object (appsink);
+
+  /* convert buffer and buffer list to sample */
   if (GST_IS_BUFFER (obj)) {
     GST_DEBUG_OBJECT (appsink, "we have a buffer %p", obj);
     priv->sample = gst_sample_make_writable (priv->sample);
     gst_sample_set_buffer_list (priv->sample, NULL);
     gst_sample_set_buffer (priv->sample, GST_BUFFER_CAST (obj));
-    sample = gst_sample_ref (priv->sample);
-  } else {
+    ret = GST_MINI_OBJECT_CAST (gst_sample_ref (priv->sample));
+    gst_mini_object_unref (obj);
+  } else if (GST_IS_BUFFER_LIST (obj)) {
     GST_DEBUG_OBJECT (appsink, "we have a list %p", obj);
     priv->sample = gst_sample_make_writable (priv->sample);
     gst_sample_set_buffer (priv->sample, NULL);
     gst_sample_set_buffer_list (priv->sample, GST_BUFFER_LIST_CAST (obj));
-    sample = gst_sample_ref (priv->sample);
+    ret = GST_MINI_OBJECT_CAST (gst_sample_ref (priv->sample));
+    gst_mini_object_unref (obj);
+  } else {
+    ret = obj;
   }
-  gst_mini_object_unref (obj);
 
   if ((priv->wait_status & STREAM_WAITING))
     g_cond_signal (&priv->cond);
 
   g_mutex_unlock (&priv->mutex);
 
-  return sample;
+  return ret;
 
   /* special conditions */
 expired:
index 036b86e..90e678f 100644 (file)
@@ -59,6 +59,14 @@ typedef struct _GstAppSinkPrivate GstAppSinkPrivate;
  *       The new sample can be retrieved with
  *       gst_app_sink_pull_sample() either from this callback
  *       or from any other thread.
+ * @new_event: Called when a new event is available.
+ *       This callback is called from the streaming thread.
+ *       The new event can be retrieved with
+ *       gst_app_sink_pull_event() either from this callback
+ *       or from any other thread.
+ *       The callback should return %TRUE if the event has been handled,
+ *       %FALSE otherwise.
+ *       Since: 1.20
  *
  * A set of callbacks that can be installed on the appsink with
  * gst_app_sink_set_callbacks().
@@ -67,9 +75,10 @@ typedef struct {
   void          (*eos)              (GstAppSink *appsink, gpointer user_data);
   GstFlowReturn (*new_preroll)      (GstAppSink *appsink, gpointer user_data);
   GstFlowReturn (*new_sample)       (GstAppSink *appsink, gpointer user_data);
+  gboolean      (*new_event)        (GstAppSink *appsink, gpointer user_data);
 
   /*< private >*/
-  gpointer     _gst_reserved[GST_PADDING];
+  gpointer     _gst_reserved[GST_PADDING - 1];
 } GstAppSinkCallbacks;
 
 struct _GstAppSink
@@ -91,15 +100,24 @@ struct _GstAppSinkClass
   void          (*eos)              (GstAppSink *appsink);
   GstFlowReturn (*new_preroll)      (GstAppSink *appsink);
   GstFlowReturn (*new_sample)       (GstAppSink *appsink);
+  /* new_event is missing as we ran out padding */
 
   /* actions */
   GstSample *   (*pull_preroll)      (GstAppSink *appsink);
   GstSample *   (*pull_sample)       (GstAppSink *appsink);
   GstSample *   (*try_pull_preroll)  (GstAppSink *appsink, GstClockTime timeout);
   GstSample *   (*try_pull_sample)   (GstAppSink *appsink, GstClockTime timeout);
+ /**
+   * GstAppSinkClass::try_pull_object:
+   *
+   * See #GstAppSink::try-pull-object: signal.
+   *
+   * Since: 1.20
+   */
+  GstMiniObject * (*try_pull_object) (GstAppSink *appsink, GstClockTime timeout);
 
   /*< private >*/
-  gpointer     _gst_reserved[GST_PADDING - 2];
+  gpointer     _gst_reserved[GST_PADDING - 3];
 };
 
 GST_APP_API
@@ -151,12 +169,18 @@ GST_APP_API
 GstSample *     gst_app_sink_pull_sample      (GstAppSink *appsink);
 
 GST_APP_API
+GstMiniObject * gst_app_sink_pull_object      (GstAppSink *appsink);
+
+GST_APP_API
 GstSample *     gst_app_sink_try_pull_preroll (GstAppSink *appsink, GstClockTime timeout);
 
 GST_APP_API
 GstSample *     gst_app_sink_try_pull_sample  (GstAppSink *appsink, GstClockTime timeout);
 
 GST_APP_API
+GstMiniObject * gst_app_sink_try_pull_object    (GstAppSink *appsink, GstClockTime timeout);
+
+GST_APP_API
 void            gst_app_sink_set_callbacks    (GstAppSink * appsink,
                                                GstAppSinkCallbacks *callbacks,
                                                gpointer user_data,
index 9acbdcb..d5e62ac 100644 (file)
@@ -714,6 +714,197 @@ GST_START_TEST (test_pull_sample_refcounts)
 
 GST_END_TEST;
 
+static gboolean
+new_event_cb (GstAppSink * appsink, gpointer callback_data)
+{
+  guint *new_event_count = callback_data;
+  *new_event_count += 1;
+  return TRUE;
+}
+
+/* Verifies that the event callback is called */
+GST_START_TEST (test_event_callback)
+{
+  GstElement *sink;
+  GstPad *sinkpad;
+  GstBuffer *buffer;
+  guint new_event_count;
+  GstAppSinkCallbacks callbacks = { NULL };
+  GstMiniObject *object;
+  GstAppSink *app_sink;
+
+  sink = setup_appsink ();
+  app_sink = GST_APP_SINK (sink);
+
+  callbacks.new_event = new_event_cb;
+
+  gst_app_sink_set_callbacks (app_sink, &callbacks, &new_event_count, NULL);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  /* push a buffer so pending events are pushed */
+  buffer = gst_buffer_new_and_alloc (4);
+  fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+  /* flush pending events from the queue */
+  while ((object = gst_app_sink_try_pull_object (app_sink, 0)))
+    gst_mini_object_unref (object);
+  new_event_count = 0;
+
+  /* push a buffer */
+  buffer = gst_buffer_new_and_alloc (4);
+  fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+  /* push custom event */
+  sinkpad = gst_element_get_static_pad (sink, "sink");
+  fail_unless (sinkpad);
+  fail_unless (gst_pad_send_event (sinkpad,
+          gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+              gst_structure_new ("custom", NULL, NULL))));
+  fail_unless_equals_int (new_event_count, 1);
+  gst_object_unref (sinkpad);
+
+  /* push a second buffer */
+  buffer = gst_buffer_new_and_alloc (4);
+  fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+  /* check if the samples and events are pulled in the right order */
+  object = gst_app_sink_pull_object (app_sink);
+  fail_unless (GST_IS_SAMPLE (object));
+  gst_mini_object_unref (object);
+
+  object = gst_app_sink_pull_object (app_sink);
+  fail_unless (GST_IS_EVENT (object));
+  fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_CUSTOM_DOWNSTREAM);
+  gst_mini_object_unref (object);
+
+  object = gst_app_sink_pull_object (app_sink);
+  fail_unless (GST_IS_SAMPLE (object));
+  gst_mini_object_unref (object);
+
+  GST_DEBUG ("cleaning up appsink");
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_appsink (sink);
+}
+
+GST_END_TEST;
+
+
+GST_START_TEST (test_event_signals)
+{
+  GstElement *sink;
+  GstPad *sinkpad;
+  GstBuffer *buffer;
+  GstMiniObject *object;
+  GstAppSink *app_sink;
+  guint new_event_count = 0;
+
+  sink = setup_appsink ();
+  app_sink = GST_APP_SINK (sink);
+
+  g_object_set (sink, "emit-signals", TRUE, NULL);
+
+  g_signal_connect (sink, "new-serialized-event", G_CALLBACK (new_event_cb),
+      &new_event_count);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+  /* push a buffer so pending events are pushed */
+  buffer = gst_buffer_new_and_alloc (4);
+  fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+  /* flush pending events from the queue */
+  while ((object = gst_app_sink_try_pull_object (app_sink, 0)))
+    gst_mini_object_unref (object);
+  new_event_count = 0;
+
+  /* push a buffer */
+  buffer = gst_buffer_new_and_alloc (4);
+  fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+  /* push custom event */
+  sinkpad = gst_element_get_static_pad (sink, "sink");
+  fail_unless (sinkpad);
+  fail_unless (gst_pad_send_event (sinkpad,
+          gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+              gst_structure_new ("custom", NULL, NULL))));
+  fail_unless_equals_int (new_event_count, 1);
+  gst_object_unref (sinkpad);
+
+  /* push a second buffer */
+  buffer = gst_buffer_new_and_alloc (4);
+  fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+  /* check if the buffers and events are pulled in the right order */
+  g_signal_emit_by_name (sink, "try-pull-object", GST_CLOCK_TIME_NONE, &object);
+  fail_unless (GST_IS_SAMPLE (object));
+  gst_mini_object_unref (object);
+
+  g_signal_emit_by_name (sink, "try-pull-object", GST_CLOCK_TIME_NONE, &object);
+  fail_unless (GST_IS_EVENT (object));
+  fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_CUSTOM_DOWNSTREAM);
+  gst_mini_object_unref (object);
+
+  g_signal_emit_by_name (sink, "try-pull-object", GST_CLOCK_TIME_NONE, &object);
+  fail_unless (GST_IS_SAMPLE (object));
+  gst_mini_object_unref (object);
+
+  GST_DEBUG ("cleaning up appsink");
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_appsink (sink);
+
+}
+
+GST_END_TEST;
+
+/* try pulling events when appsink is in PAUSED */
+GST_START_TEST (test_event_paused)
+{
+  GstElement *sink;
+  guint new_event_count = 0;
+  GstAppSinkCallbacks callbacks = { NULL };
+  GstMiniObject *object;
+  GstAppSink *app_sink;
+  GstCaps *caps;
+
+  sink = setup_appsink ();
+  app_sink = GST_APP_SINK (sink);
+
+  callbacks.new_event = new_event_cb;
+
+  gst_app_sink_set_callbacks (app_sink, &callbacks, &new_event_count, NULL);
+
+  ASSERT_SET_STATE (sink, GST_STATE_PAUSED, GST_STATE_CHANGE_ASYNC);
+
+  /* push a couple of events while in PAUSED */
+  gst_pad_push_event (mysrcpad, gst_event_new_stream_start ("test"));
+  caps = gst_caps_new_simple ("audio/x-raw", NULL, NULL);
+  gst_pad_push_event (mysrcpad, gst_event_new_caps (caps));
+  gst_caps_unref (caps);
+
+  fail_unless_equals_int (new_event_count, 2);
+
+  /* check pulled events */
+  object = gst_app_sink_pull_object (app_sink);
+  fail_unless (GST_IS_EVENT (object));
+  fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_STREAM_START);
+  gst_mini_object_unref (object);
+
+  object = gst_app_sink_pull_object (app_sink);
+  fail_unless (GST_IS_EVENT (object));
+  fail_unless_equals_int (GST_EVENT_TYPE (object), GST_EVENT_CAPS);
+  gst_mini_object_unref (object);
+
+  object = gst_app_sink_try_pull_object (app_sink, 0);
+  fail_if (object);
+
+  GST_DEBUG ("cleaning up appsink");
+  ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+  cleanup_appsink (sink);
+}
+
+GST_END_TEST;
+
 static Suite *
 appsink_suite (void)
 {
@@ -735,7 +926,9 @@ appsink_suite (void)
   tcase_add_test (tc_chain, test_pull_preroll);
   tcase_add_test (tc_chain, test_do_not_care_preroll);
   tcase_add_test (tc_chain, test_pull_sample_refcounts);
-
+  tcase_add_test (tc_chain, test_event_callback);
+  tcase_add_test (tc_chain, test_event_signals);
+  tcase_add_test (tc_chain, test_event_paused);
   return s;
 }