rtponviftimestamp: Do not rearange order of data
authorLinus Svensson <linussn@axis.com>
Fri, 6 Nov 2015 08:44:16 +0000 (09:44 +0100)
committerOlivier CrĂȘte <olivier.crete@collabora.com>
Fri, 6 Nov 2015 17:55:25 +0000 (12:55 -0500)
If a buffer or a buffer list is cached, no events serialized with the
data stream should get through. The cached buffers and events should
be purged when we stop flushing.

https://bugzilla.gnome.org/show_bug.cgi?id=757688

gst/onvif/gstrtponviftimestamp.c
gst/onvif/gstrtponviftimestamp.h
tests/check/elements/rtponviftimestamp.c

index 818c8af..b539c83 100644 (file)
@@ -42,6 +42,11 @@ static GstFlowReturn gst_rtp_onvif_timestamp_chain (GstPad * pad,
 static GstFlowReturn gst_rtp_onvif_timestamp_chain_list (GstPad * pad,
     GstObject * parent, GstBufferList * list);
 
+static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
+    GstBuffer * buf, gboolean end_contiguous);
+static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
+    GstBufferList * list, gboolean end_contiguous);
+
 static GstStaticPadTemplate sink_template_factory =
 GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
@@ -112,6 +117,65 @@ gst_rtp_onvif_timestamp_set_property (GObject * object,
   }
 }
 
+/* send cached buffer or list, and events, if present */
+static GstFlowReturn
+send_cached_buffer_and_events (GstRtpOnvifTimestamp * self,
+    gboolean end_contiguous)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  g_assert (!(self->buffer && self->list));
+
+  if (self->buffer) {
+    GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->buffer);
+    ret = handle_and_push_buffer (self, self->buffer, end_contiguous);
+    self->buffer = NULL;
+  }
+  if (self->list) {
+    GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->list);
+    ret = handle_and_push_buffer_list (self, self->list, end_contiguous);
+    self->list = NULL;
+  }
+
+  if (ret != GST_FLOW_OK)
+    goto out;
+
+  while (!g_queue_is_empty (self->event_queue)) {
+    GstEvent *event;
+
+    event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
+    GST_LOG_OBJECT (self->sinkpad, "sending %" GST_PTR_FORMAT, event);
+    (void) gst_pad_send_event (self->sinkpad, event);
+  }
+
+out:
+  return ret;
+}
+
+static void
+purge_cached_buffer_and_events (GstRtpOnvifTimestamp * self)
+{
+  g_assert (!(self->buffer && self->list));
+
+  if (self->buffer) {
+    GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->buffer);
+    gst_buffer_unref (self->buffer);
+    self->buffer = NULL;
+  }
+  if (self->list) {
+    GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->list);
+    gst_buffer_list_unref (self->list);
+    self->list = NULL;
+  }
+
+  while (!g_queue_is_empty (self->event_queue)) {
+    GstEvent *event;
+
+    event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue));
+    gst_event_unref (event);
+  }
+}
+
 static GstStateChangeReturn
 gst_rtp_onvif_timestamp_change_state (GstElement * element,
     GstStateChange transition)
@@ -121,6 +185,7 @@ gst_rtp_onvif_timestamp_change_state (GstElement * element,
 
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+      purge_cached_buffer_and_events (self);
       gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
       break;
     default:
@@ -161,10 +226,7 @@ gst_rtp_onvif_timestamp_finalize (GObject * object)
 {
   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object);
 
-  if (self->buffer)
-    gst_buffer_unref (self->buffer);
-  if (self->list)
-    gst_buffer_list_unref (self->list);
+  g_queue_free (self->event_queue);
 
   G_OBJECT_CLASS (gst_rtp_onvif_timestamp_parent_class)->finalize (object);
 }
@@ -219,42 +281,62 @@ gst_rtp_onvif_timestamp_class_init (GstRtpOnvifTimestampClass * klass)
       0, "ONVIF NTP timestamps RTP extension");
 }
 
-static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self,
-    GstBuffer * buf, gboolean end_contiguous);
-static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
-    GstBufferList * list, gboolean end_contiguous);
-
 static gboolean
 gst_rtp_onvif_timestamp_sink_event (GstPad * pad, GstObject * parent,
     GstEvent * event)
 {
   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
+  gboolean drop = FALSE;
+  gboolean ret = TRUE;
 
   GST_DEBUG_OBJECT (pad, "handling event %s", GST_EVENT_TYPE_NAME (event));
 
+  /* handle serialized events, which, should not be enqueued */
   switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_SEGMENT:
-      gst_event_copy_segment (event, &self->segment);
-      break;
+    case GST_EVENT_EOS:
+      {
+        GstFlowReturn res;
+        /* Push pending buffers, if any */
+        res = send_cached_buffer_and_events (self, TRUE);
+        if (res != GST_FLOW_OK) {
+          drop = TRUE;
+          ret = FALSE;
+          goto out;
+        }
+        break;
+      }
     case GST_EVENT_FLUSH_STOP:
+      purge_cached_buffer_and_events (self);
       gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
       break;
-    case GST_EVENT_EOS:
-      /* Push pending buffers, if any */
-      if (self->buffer) {
-        handle_and_push_buffer (self, self->buffer, TRUE);
-        self->buffer = NULL;
-      }
-      if (self->list) {
-        handle_and_push_buffer_list (self, self->list, TRUE);
-        self->list = NULL;
-      }
+    default:
+      break;
+  }
+
+  /* enqueue serialized events if there is a cached buffer */
+  if (GST_EVENT_IS_SERIALIZED (event) && (self->buffer || self->list)) {
+    GST_DEBUG ("enqueueing serialized event");
+    g_queue_push_tail (self->event_queue, event);
+    event = NULL;
+    goto out;
+  }
+
+  /* handle rest of the events */
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEGMENT:
+      gst_event_copy_segment (event, &self->segment);
       break;
     default:
       break;
   }
 
-  return gst_pad_event_default (pad, parent, event);
+out:
+  if (drop)
+    gst_event_unref (event);
+  else if (event)
+    ret = gst_pad_event_default (pad, parent, event);
+
+  return ret;
 }
 
 static void
@@ -278,10 +360,11 @@ gst_rtp_onvif_timestamp_init (GstRtpOnvifTimestamp * self)
   self->prop_ntp_offset = DEFAULT_NTP_OFFSET;
   self->prop_set_e_bit = DEFAULT_SET_E_BIT;
 
+  gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
+
+  self->event_queue = g_queue_new ();
   self->buffer = NULL;
   self->list = NULL;
-
-  gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED);
 }
 
 #define EXTENSION_ID 0xABAC
@@ -414,7 +497,7 @@ done:
   return TRUE;
 }
 
-/* @buf: (transfer all) */
+/* @buf: (transfer full) */
 static GstFlowReturn
 handle_and_push_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf,
     gboolean end_contiguous)
@@ -439,20 +522,15 @@ gst_rtp_onvif_timestamp_chain (GstPad * pad, GstObject * parent,
     return handle_and_push_buffer (self, buf, FALSE);
   }
 
-  /* We have to wait for the *next* buffer before pushing this one */
+  /* send any previously cached item(s), this leaves an empty queue */
+  result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf));
 
-  if (self->buffer) {
-    /* push the *previous* buffer received */
-    result = handle_and_push_buffer (self, self->buffer,
-        GST_BUFFER_IS_DISCONT (buf));
-  }
-
-  /* Transfer ownership */
+  /* enqueue the new item, as the only item in the queue */
   self->buffer = buf;
   return result;
 }
 
-/* @buf: (transfer all) */
+/* @buf: (transfer full) */
 static GstFlowReturn
 handle_and_push_buffer_list (GstRtpOnvifTimestamp * self,
     GstBufferList * list, gboolean end_contiguous)
@@ -477,24 +555,18 @@ gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent,
     GstBufferList * list)
 {
   GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent);
-  GstFlowReturn result = GST_FLOW_OK;
   GstBuffer *buf;
+  GstFlowReturn result = GST_FLOW_OK;
 
   if (!self->prop_set_e_bit) {
     return handle_and_push_buffer_list (self, list, FALSE);
   }
 
-  /* We have to wait for the *next* list before pushing this one */
-
-  if (self->list) {
-    /* push the *previous* list received */
-    buf = gst_buffer_list_get (list, 0);
-
-    result = handle_and_push_buffer_list (self, self->list,
-        GST_BUFFER_IS_DISCONT (buf));
-  }
+  /* send any previously cached item(s), this leaves an empty queue */
+  buf = gst_buffer_list_get (list, 0);
+  result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf));
 
-  /* Transfer ownership */
+  /* enqueue the new item, as the only item in the queue */
   self->list = list;
   return result;
 }
index 40ad8d3..b8e2b8e 100644 (file)
@@ -55,8 +55,8 @@ struct _GstRtpOnvifTimestamp {
   GstClockTime ntp_offset;
 
   GstSegment segment;
-  gboolean received_segment;
   /* Buffer waiting to be handled, only used if prop_set_e_bit is TRUE */
+  GQueue *event_queue;
   GstBuffer *buffer;
   GstBufferList *list;
 };
index 790dc68..077220e 100644 (file)
 static GstElement *element;
 static GstPad *mysrcpad;
 static GstPad *mysinkpad;
+/* These are global mainly because they are used from the setup/cleanup
+ * fixture functions */
+static gulong myprobe;
+static GList *mypushedevents;
+static GList *myreceivedevents;
 
 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
@@ -39,8 +44,86 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
-#define NTP_OFFSET ((guint64) 1245)
-#define TIMESTAMP ((GstClockTime) 42)
+#define NTP_OFFSET  ((guint64) 1245)
+#define TIMESTAMP   ((GstClockTime)42)
+#define COMPARE     TRUE
+#define NO_COMPARE  FALSE
+
+static GstPadProbeReturn
+event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+  GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
+
+  GST_DEBUG ("got %s",  GST_EVENT_TYPE_NAME (event));
+  myreceivedevents = g_list_append (myreceivedevents, gst_event_ref (event));
+
+  return GST_PAD_PROBE_OK;
+}
+
+static GstEvent *
+create_event (GstEventType type)
+{
+  GstEvent *event = NULL;
+
+  switch (type) {
+   case GST_EVENT_CUSTOM_DOWNSTREAM:
+    event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+        gst_structure_new ("x-app/test", "test-field", G_TYPE_STRING,
+            "test-value", NULL));
+    break;
+   case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
+    event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB,
+        gst_structure_new ("x-app/test", "test-field", G_TYPE_STRING,
+            "test-value", NULL));
+    break;
+   case GST_EVENT_EOS:
+    event = gst_event_new_eos ();
+    break;
+   default:
+    g_assert_not_reached ();
+    break;
+  }
+
+  return event;
+}
+
+static void
+create_and_push_event (GstEventType type)
+{
+  GstEvent *event = create_event (type);
+
+  mypushedevents = g_list_append (mypushedevents, event);
+  fail_unless (gst_pad_push_event (mysrcpad, event));
+}
+
+static void
+check_and_clear_events (gint expected, gboolean compare)
+{
+  GList *p;
+  GList *r;
+
+  /* verify that there's as many queued events as expected */
+  fail_unless_equals_int (g_list_length (myreceivedevents), expected);
+
+  if (compare) {
+    fail_unless_equals_int (expected, g_list_length (mypushedevents));
+
+    /* verify that the events are queued in the expected order */
+    r = myreceivedevents;
+    p = mypushedevents;
+
+    while (p != NULL) {
+      fail_unless_equals_pointer (p->data, r->data);
+      p = g_list_next (p);
+      r = g_list_next (r);
+    }
+  }
+
+  g_list_free_full (myreceivedevents, (GDestroyNotify)gst_event_unref);
+  myreceivedevents = NULL;
+  g_list_free (mypushedevents);
+  mypushedevents = NULL;
+}
 
 static void
 setup (void)
@@ -54,7 +137,6 @@ setup (void)
   gst_pad_set_active (mysrcpad, TRUE);
 }
 
-
 static void
 cleanup (void)
 {
@@ -70,6 +152,30 @@ cleanup (void)
 
   gst_check_teardown_element (element);
   element = NULL;
+
+  gst_check_drop_buffers ();
+}
+
+static void
+setup_with_event (void)
+{
+  setup ();
+
+  myprobe = gst_pad_add_probe (mysinkpad,
+      GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, event_probe, NULL, NULL);
+  myreceivedevents = NULL;
+  mypushedevents = NULL;
+}
+
+static void
+cleanup_with_event (void)
+{
+  gst_pad_remove_probe (mysinkpad, myprobe);
+  myprobe = 0;
+  myreceivedevents = NULL;
+  mypushedevents = NULL;
+
+  cleanup ();
 }
 
 static void
@@ -290,22 +396,263 @@ GST_START_TEST (test_apply_e_bit)
 
 GST_END_TEST;
 
-static Suite *
-onviftimestamp_suite (void)
+GST_START_TEST (test_flushing)
 {
-  Suite *s = suite_create ("onviftimestamp");
-  TCase *tc_chain;
+  GstBuffer *buffer;
+
+  /* set the e-bit, so the element use caching */
+  g_object_set (element, "set-e-bit", TRUE, NULL);
+  /* set the ntp-offset, since no one will provide a clock */
+  g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
+
+  ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
+
+  /* create and push the first buffer */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* no buffers should have made it through */
+  fail_unless_equals_int (g_list_length (buffers), 0);
+
+  /* flush the element */
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_start ()));
+  fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_stop (FALSE)));
+
+  /* resend events */
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
+
+  /* create and push a second buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* still no buffers should have made it through (the first one should have
+   * been dropped during flushing) */
+  fail_unless_equals_int (g_list_length (buffers), 0);
+
+  ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_reusable_element_no_e_bit)
+{
+  GstBuffer *buffer;
+
+  /* set the ntp-offset, since no one will provide a clock */
+  g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
+
+  ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
+
+  /* create and push the first buffer */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* create and push a second buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* create and push a third buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+
+  fail_unless_equals_int (g_list_length (buffers), 3);
+
+  ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
 
-  tc_chain = tcase_create ("apply");
+  /* create and push the first buffer */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
 
-  suite_add_tcase (s, tc_chain);
-  tcase_add_checked_fixture (tc_chain, setup, cleanup);
+  /* create and push a second buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
 
-  tcase_add_test (tc_chain, test_apply_discont);
-  tcase_add_test (tc_chain, test_apply_not_discont);
-  tcase_add_test (tc_chain, test_apply_clean_point);
-  tcase_add_test (tc_chain, test_apply_no_e_bit);
-  tcase_add_test (tc_chain, test_apply_e_bit);
+  /* create and push a third buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+
+  fail_unless_equals_int (g_list_length (buffers), 6);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_reusable_element_e_bit)
+{
+  GstBuffer *buffer;
+
+  /* set the e-bit, so the element use caching */
+  g_object_set (element, "set-e-bit", TRUE, NULL);
+  /* set the ntp-offset, since no one will provide a clock */
+  g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
+
+  ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
+
+  /* create and push the first buffer */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* create and push a second buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* create and push a third buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+
+  fail_unless_equals_int (g_list_length (buffers), 2);
+
+  ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
+
+  /* create and push the first buffer */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* create and push a second buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  /* create and push a third buffer */
+  buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+
+  ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+
+  fail_unless_equals_int (g_list_length (buffers), 4);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_serialized_events)
+{
+  GstBuffer *buffer;
+
+  /* we want the e-bit set so that buffers are cached */
+  g_object_set (element, "set-e-bit", TRUE, NULL);
+  g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
+
+  ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+
+  /* send intitial events (stream-start and segment) */
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
+  check_and_clear_events (2, NO_COMPARE);
+
+  /* events received while no buffer is cached should be forwarded */
+  create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
+  check_and_clear_events (1, NO_COMPARE);
+
+  /* create and push the first buffer, which should be cached */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+  fail_unless_equals_int (g_list_length (buffers), 0);
+  /* serialized events should be queued when there's a buffer cached */
+  create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
+  fail_unless_equals_int (g_list_length (myreceivedevents), 0);
+  /* there's still a buffer cached... */
+  create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
+  fail_unless_equals_int (g_list_length (myreceivedevents), 0);
+
+  /* receiving a new buffer should let the first through, along with the
+   * queued serialized events  */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  check_and_clear_events (2, COMPARE);
+
+  /* there's still a buffer cached, a new serialized event should be quueud */
+  create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
+  fail_unless_equals_int (g_list_length (myreceivedevents), 0);
+
+  /* when receiving an EOS cached buffer and queued events should be forwarded */
+  create_and_push_event (GST_EVENT_EOS);
+  check_and_clear_events (2, COMPARE);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_non_serialized_events)
+{
+  GstEvent *event;
+  GstBuffer *buffer;
+
+  /* we want the e-bit set so that buffers are cached */
+  g_object_set (element, "set-e-bit", TRUE, NULL);
+  g_object_set (element, "ntp-offset", NTP_OFFSET, NULL);
+
+  ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS);
+
+  /* send intitial events (stream-start and segment) */
+  gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME);
+  fail_unless_equals_int (g_list_length (myreceivedevents), 2);
+  check_and_clear_events (2, NO_COMPARE);
+
+  /* events received while no buffer is cached should be forwarded */
+  create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB);
+  check_and_clear_events (1, COMPARE);
+
+  /* create and push the first buffer, which should be cached */
+  buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE);
+  fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK);
+  fail_unless_equals_int (g_list_length (buffers), 0);
+  /* non-serialized events should be forwarded regardless of whether
+   * there is a cached buffer */
+  create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB);
+  check_and_clear_events (1, COMPARE);
+
+  /* there's still a buffer cached, push a serialized event and make sure
+   * it's queued */
+  create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM);
+  fail_unless_equals_int (g_list_length (myreceivedevents), 0);
+  /* non-serialized events should be forwarded regardless of whether there
+   * are serialized events queued, thus the g_list_prepend below */
+  event = create_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB);
+  mypushedevents = g_list_prepend (mypushedevents, event);
+  fail_unless (gst_pad_push_event (mysrcpad, event));
+  fail_unless_equals_int (g_list_length (myreceivedevents), 1);
+
+  /* when receiving an EOS cached buffer and queued events should be forwarded */
+  create_and_push_event (GST_EVENT_EOS);
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  check_and_clear_events (3, COMPARE);
+}
+
+GST_END_TEST;
+
+static Suite *
+onviftimestamp_suite (void)
+{
+  Suite *s = suite_create ("onviftimestamp");
+  TCase *tc_general, *tc_events;
+
+  tc_general = tcase_create ("apply");
+  suite_add_tcase (s, tc_general);
+  tcase_add_checked_fixture (tc_general, setup, cleanup);
+
+  tcase_add_test (tc_general, test_apply_discont);
+  tcase_add_test (tc_general, test_apply_not_discont);
+  tcase_add_test (tc_general, test_apply_clean_point);
+  tcase_add_test (tc_general, test_apply_no_e_bit);
+  tcase_add_test (tc_general, test_apply_e_bit);
+  tcase_add_test (tc_general, test_flushing);
+  tcase_add_test (tc_general, test_reusable_element_no_e_bit);
+  tcase_add_test (tc_general, test_reusable_element_e_bit);
+
+  tc_events = tcase_create ("events");
+  suite_add_tcase (s, tc_events);
+  tcase_add_checked_fixture (tc_events, setup_with_event, cleanup_with_event);
+
+  tcase_add_test (tc_events, test_serialized_events);
+  tcase_add_test (tc_events, test_non_serialized_events);
 
   return s;
 }