From a58826292eab80e83b9de525753895e56410e301 Mon Sep 17 00:00:00 2001 From: Linus Svensson Date: Fri, 6 Nov 2015 09:44:16 +0100 Subject: [PATCH] rtponviftimestamp: Do not rearange order of data If a buffer or a buffer list is cached, no events serialized with the data stream should get through. The cached buffers and events should be purged when we stop flushing. https://bugzilla.gnome.org/show_bug.cgi?id=757688 --- gst/onvif/gstrtponviftimestamp.c | 164 ++++++++++---- gst/onvif/gstrtponviftimestamp.h | 2 +- tests/check/elements/rtponviftimestamp.c | 377 +++++++++++++++++++++++++++++-- 3 files changed, 481 insertions(+), 62 deletions(-) diff --git a/gst/onvif/gstrtponviftimestamp.c b/gst/onvif/gstrtponviftimestamp.c index 818c8af..b539c83 100644 --- a/gst/onvif/gstrtponviftimestamp.c +++ b/gst/onvif/gstrtponviftimestamp.c @@ -42,6 +42,11 @@ static GstFlowReturn gst_rtp_onvif_timestamp_chain (GstPad * pad, static GstFlowReturn gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list); +static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self, + GstBuffer * buf, gboolean end_contiguous); +static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, + GstBufferList * list, gboolean end_contiguous); + static GstStaticPadTemplate sink_template_factory = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -112,6 +117,65 @@ gst_rtp_onvif_timestamp_set_property (GObject * object, } } +/* send cached buffer or list, and events, if present */ +static GstFlowReturn +send_cached_buffer_and_events (GstRtpOnvifTimestamp * self, + gboolean end_contiguous) +{ + GstFlowReturn ret = GST_FLOW_OK; + + g_assert (!(self->buffer && self->list)); + + if (self->buffer) { + GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->buffer); + ret = handle_and_push_buffer (self, self->buffer, end_contiguous); + self->buffer = NULL; + } + if (self->list) { + GST_DEBUG_OBJECT (self, "pushing %" GST_PTR_FORMAT, self->list); + ret = handle_and_push_buffer_list (self, self->list, end_contiguous); + self->list = NULL; + } + + if (ret != GST_FLOW_OK) + goto out; + + while (!g_queue_is_empty (self->event_queue)) { + GstEvent *event; + + event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue)); + GST_LOG_OBJECT (self->sinkpad, "sending %" GST_PTR_FORMAT, event); + (void) gst_pad_send_event (self->sinkpad, event); + } + +out: + return ret; +} + +static void +purge_cached_buffer_and_events (GstRtpOnvifTimestamp * self) +{ + g_assert (!(self->buffer && self->list)); + + if (self->buffer) { + GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->buffer); + gst_buffer_unref (self->buffer); + self->buffer = NULL; + } + if (self->list) { + GST_DEBUG_OBJECT (self, "purging %" GST_PTR_FORMAT, self->list); + gst_buffer_list_unref (self->list); + self->list = NULL; + } + + while (!g_queue_is_empty (self->event_queue)) { + GstEvent *event; + + event = GST_EVENT_CAST (g_queue_pop_head (self->event_queue)); + gst_event_unref (event); + } +} + static GstStateChangeReturn gst_rtp_onvif_timestamp_change_state (GstElement * element, GstStateChange transition) @@ -121,6 +185,7 @@ gst_rtp_onvif_timestamp_change_state (GstElement * element, switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: + purge_cached_buffer_and_events (self); gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); break; default: @@ -161,10 +226,7 @@ gst_rtp_onvif_timestamp_finalize (GObject * object) { GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (object); - if (self->buffer) - gst_buffer_unref (self->buffer); - if (self->list) - gst_buffer_list_unref (self->list); + g_queue_free (self->event_queue); G_OBJECT_CLASS (gst_rtp_onvif_timestamp_parent_class)->finalize (object); } @@ -219,42 +281,62 @@ gst_rtp_onvif_timestamp_class_init (GstRtpOnvifTimestampClass * klass) 0, "ONVIF NTP timestamps RTP extension"); } -static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self, - GstBuffer * buf, gboolean end_contiguous); -static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, - GstBufferList * list, gboolean end_contiguous); - static gboolean gst_rtp_onvif_timestamp_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) { GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent); + gboolean drop = FALSE; + gboolean ret = TRUE; GST_DEBUG_OBJECT (pad, "handling event %s", GST_EVENT_TYPE_NAME (event)); + /* handle serialized events, which, should not be enqueued */ switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_SEGMENT: - gst_event_copy_segment (event, &self->segment); - break; + case GST_EVENT_EOS: + { + GstFlowReturn res; + /* Push pending buffers, if any */ + res = send_cached_buffer_and_events (self, TRUE); + if (res != GST_FLOW_OK) { + drop = TRUE; + ret = FALSE; + goto out; + } + break; + } case GST_EVENT_FLUSH_STOP: + purge_cached_buffer_and_events (self); gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); break; - case GST_EVENT_EOS: - /* Push pending buffers, if any */ - if (self->buffer) { - handle_and_push_buffer (self, self->buffer, TRUE); - self->buffer = NULL; - } - if (self->list) { - handle_and_push_buffer_list (self, self->list, TRUE); - self->list = NULL; - } + default: + break; + } + + /* enqueue serialized events if there is a cached buffer */ + if (GST_EVENT_IS_SERIALIZED (event) && (self->buffer || self->list)) { + GST_DEBUG ("enqueueing serialized event"); + g_queue_push_tail (self->event_queue, event); + event = NULL; + goto out; + } + + /* handle rest of the events */ + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + gst_event_copy_segment (event, &self->segment); break; default: break; } - return gst_pad_event_default (pad, parent, event); +out: + if (drop) + gst_event_unref (event); + else if (event) + ret = gst_pad_event_default (pad, parent, event); + + return ret; } static void @@ -278,10 +360,11 @@ gst_rtp_onvif_timestamp_init (GstRtpOnvifTimestamp * self) self->prop_ntp_offset = DEFAULT_NTP_OFFSET; self->prop_set_e_bit = DEFAULT_SET_E_BIT; + gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); + + self->event_queue = g_queue_new (); self->buffer = NULL; self->list = NULL; - - gst_segment_init (&self->segment, GST_FORMAT_UNDEFINED); } #define EXTENSION_ID 0xABAC @@ -414,7 +497,7 @@ done: return TRUE; } -/* @buf: (transfer all) */ +/* @buf: (transfer full) */ static GstFlowReturn handle_and_push_buffer (GstRtpOnvifTimestamp * self, GstBuffer * buf, gboolean end_contiguous) @@ -439,20 +522,15 @@ gst_rtp_onvif_timestamp_chain (GstPad * pad, GstObject * parent, return handle_and_push_buffer (self, buf, FALSE); } - /* We have to wait for the *next* buffer before pushing this one */ + /* send any previously cached item(s), this leaves an empty queue */ + result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf)); - if (self->buffer) { - /* push the *previous* buffer received */ - result = handle_and_push_buffer (self, self->buffer, - GST_BUFFER_IS_DISCONT (buf)); - } - - /* Transfer ownership */ + /* enqueue the new item, as the only item in the queue */ self->buffer = buf; return result; } -/* @buf: (transfer all) */ +/* @buf: (transfer full) */ static GstFlowReturn handle_and_push_buffer_list (GstRtpOnvifTimestamp * self, GstBufferList * list, gboolean end_contiguous) @@ -477,24 +555,18 @@ gst_rtp_onvif_timestamp_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list) { GstRtpOnvifTimestamp *self = GST_RTP_ONVIF_TIMESTAMP (parent); - GstFlowReturn result = GST_FLOW_OK; GstBuffer *buf; + GstFlowReturn result = GST_FLOW_OK; if (!self->prop_set_e_bit) { return handle_and_push_buffer_list (self, list, FALSE); } - /* We have to wait for the *next* list before pushing this one */ - - if (self->list) { - /* push the *previous* list received */ - buf = gst_buffer_list_get (list, 0); - - result = handle_and_push_buffer_list (self, self->list, - GST_BUFFER_IS_DISCONT (buf)); - } + /* send any previously cached item(s), this leaves an empty queue */ + buf = gst_buffer_list_get (list, 0); + result = send_cached_buffer_and_events (self, GST_BUFFER_IS_DISCONT (buf)); - /* Transfer ownership */ + /* enqueue the new item, as the only item in the queue */ self->list = list; return result; } diff --git a/gst/onvif/gstrtponviftimestamp.h b/gst/onvif/gstrtponviftimestamp.h index 40ad8d3..b8e2b8e 100644 --- a/gst/onvif/gstrtponviftimestamp.h +++ b/gst/onvif/gstrtponviftimestamp.h @@ -55,8 +55,8 @@ struct _GstRtpOnvifTimestamp { GstClockTime ntp_offset; GstSegment segment; - gboolean received_segment; /* Buffer waiting to be handled, only used if prop_set_e_bit is TRUE */ + GQueue *event_queue; GstBuffer *buffer; GstBufferList *list; }; diff --git a/tests/check/elements/rtponviftimestamp.c b/tests/check/elements/rtponviftimestamp.c index 790dc68..077220e 100644 --- a/tests/check/elements/rtponviftimestamp.c +++ b/tests/check/elements/rtponviftimestamp.c @@ -27,6 +27,11 @@ static GstElement *element; static GstPad *mysrcpad; static GstPad *mysinkpad; +/* These are global mainly because they are used from the setup/cleanup + * fixture functions */ +static gulong myprobe; +static GList *mypushedevents; +static GList *myreceivedevents; static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -39,8 +44,86 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", GST_STATIC_CAPS ("application/x-rtp") ); -#define NTP_OFFSET ((guint64) 1245) -#define TIMESTAMP ((GstClockTime) 42) +#define NTP_OFFSET ((guint64) 1245) +#define TIMESTAMP ((GstClockTime)42) +#define COMPARE TRUE +#define NO_COMPARE FALSE + +static GstPadProbeReturn +event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) +{ + GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info); + + GST_DEBUG ("got %s", GST_EVENT_TYPE_NAME (event)); + myreceivedevents = g_list_append (myreceivedevents, gst_event_ref (event)); + + return GST_PAD_PROBE_OK; +} + +static GstEvent * +create_event (GstEventType type) +{ + GstEvent *event = NULL; + + switch (type) { + case GST_EVENT_CUSTOM_DOWNSTREAM: + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("x-app/test", "test-field", G_TYPE_STRING, + "test-value", NULL)); + break; + case GST_EVENT_CUSTOM_DOWNSTREAM_OOB: + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, + gst_structure_new ("x-app/test", "test-field", G_TYPE_STRING, + "test-value", NULL)); + break; + case GST_EVENT_EOS: + event = gst_event_new_eos (); + break; + default: + g_assert_not_reached (); + break; + } + + return event; +} + +static void +create_and_push_event (GstEventType type) +{ + GstEvent *event = create_event (type); + + mypushedevents = g_list_append (mypushedevents, event); + fail_unless (gst_pad_push_event (mysrcpad, event)); +} + +static void +check_and_clear_events (gint expected, gboolean compare) +{ + GList *p; + GList *r; + + /* verify that there's as many queued events as expected */ + fail_unless_equals_int (g_list_length (myreceivedevents), expected); + + if (compare) { + fail_unless_equals_int (expected, g_list_length (mypushedevents)); + + /* verify that the events are queued in the expected order */ + r = myreceivedevents; + p = mypushedevents; + + while (p != NULL) { + fail_unless_equals_pointer (p->data, r->data); + p = g_list_next (p); + r = g_list_next (r); + } + } + + g_list_free_full (myreceivedevents, (GDestroyNotify)gst_event_unref); + myreceivedevents = NULL; + g_list_free (mypushedevents); + mypushedevents = NULL; +} static void setup (void) @@ -54,7 +137,6 @@ setup (void) gst_pad_set_active (mysrcpad, TRUE); } - static void cleanup (void) { @@ -70,6 +152,30 @@ cleanup (void) gst_check_teardown_element (element); element = NULL; + + gst_check_drop_buffers (); +} + +static void +setup_with_event (void) +{ + setup (); + + myprobe = gst_pad_add_probe (mysinkpad, + GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, event_probe, NULL, NULL); + myreceivedevents = NULL; + mypushedevents = NULL; +} + +static void +cleanup_with_event (void) +{ + gst_pad_remove_probe (mysinkpad, myprobe); + myprobe = 0; + myreceivedevents = NULL; + mypushedevents = NULL; + + cleanup (); } static void @@ -290,22 +396,263 @@ GST_START_TEST (test_apply_e_bit) GST_END_TEST; -static Suite * -onviftimestamp_suite (void) +GST_START_TEST (test_flushing) { - Suite *s = suite_create ("onviftimestamp"); - TCase *tc_chain; + GstBuffer *buffer; + + /* set the e-bit, so the element use caching */ + g_object_set (element, "set-e-bit", TRUE, NULL); + /* set the ntp-offset, since no one will provide a clock */ + g_object_set (element, "ntp-offset", NTP_OFFSET, NULL); + + ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); + + /* create and push the first buffer */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* no buffers should have made it through */ + fail_unless_equals_int (g_list_length (buffers), 0); + + /* flush the element */ + fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_start ())); + fail_unless (gst_pad_push_event (mysrcpad, gst_event_new_flush_stop (FALSE))); + + /* resend events */ + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); + + /* create and push a second buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* still no buffers should have made it through (the first one should have + * been dropped during flushing) */ + fail_unless_equals_int (g_list_length (buffers), 0); + + ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); +} + +GST_END_TEST; + +GST_START_TEST (test_reusable_element_no_e_bit) +{ + GstBuffer *buffer; + + /* set the ntp-offset, since no one will provide a clock */ + g_object_set (element, "ntp-offset", NTP_OFFSET, NULL); + + ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); + + /* create and push the first buffer */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* create and push a second buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* create and push a third buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + + fail_unless_equals_int (g_list_length (buffers), 3); + + ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); - tc_chain = tcase_create ("apply"); + /* create and push the first buffer */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); - suite_add_tcase (s, tc_chain); - tcase_add_checked_fixture (tc_chain, setup, cleanup); + /* create and push a second buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); - tcase_add_test (tc_chain, test_apply_discont); - tcase_add_test (tc_chain, test_apply_not_discont); - tcase_add_test (tc_chain, test_apply_clean_point); - tcase_add_test (tc_chain, test_apply_no_e_bit); - tcase_add_test (tc_chain, test_apply_e_bit); + /* create and push a third buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + + fail_unless_equals_int (g_list_length (buffers), 6); +} + +GST_END_TEST; + +GST_START_TEST (test_reusable_element_e_bit) +{ + GstBuffer *buffer; + + /* set the e-bit, so the element use caching */ + g_object_set (element, "set-e-bit", TRUE, NULL); + /* set the ntp-offset, since no one will provide a clock */ + g_object_set (element, "ntp-offset", NTP_OFFSET, NULL); + + ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); + + /* create and push the first buffer */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* create and push a second buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* create and push a third buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + + fail_unless_equals_int (g_list_length (buffers), 2); + + ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); + + /* create and push the first buffer */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* create and push a second buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 1, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + /* create and push a third buffer */ + buffer = create_rtp_buffer (TIMESTAMP + 2, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + + ASSERT_SET_STATE (element, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS); + + fail_unless_equals_int (g_list_length (buffers), 4); +} + +GST_END_TEST; + +GST_START_TEST (test_serialized_events) +{ + GstBuffer *buffer; + + /* we want the e-bit set so that buffers are cached */ + g_object_set (element, "set-e-bit", TRUE, NULL); + g_object_set (element, "ntp-offset", NTP_OFFSET, NULL); + + ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + + /* send intitial events (stream-start and segment) */ + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); + check_and_clear_events (2, NO_COMPARE); + + /* events received while no buffer is cached should be forwarded */ + create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM); + check_and_clear_events (1, NO_COMPARE); + + /* create and push the first buffer, which should be cached */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + fail_unless_equals_int (g_list_length (buffers), 0); + /* serialized events should be queued when there's a buffer cached */ + create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM); + fail_unless_equals_int (g_list_length (myreceivedevents), 0); + /* there's still a buffer cached... */ + create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM); + fail_unless_equals_int (g_list_length (myreceivedevents), 0); + + /* receiving a new buffer should let the first through, along with the + * queued serialized events */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + fail_unless_equals_int (g_list_length (buffers), 1); + check_and_clear_events (2, COMPARE); + + /* there's still a buffer cached, a new serialized event should be quueud */ + create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM); + fail_unless_equals_int (g_list_length (myreceivedevents), 0); + + /* when receiving an EOS cached buffer and queued events should be forwarded */ + create_and_push_event (GST_EVENT_EOS); + check_and_clear_events (2, COMPARE); +} + +GST_END_TEST; + +GST_START_TEST (test_non_serialized_events) +{ + GstEvent *event; + GstBuffer *buffer; + + /* we want the e-bit set so that buffers are cached */ + g_object_set (element, "set-e-bit", TRUE, NULL); + g_object_set (element, "ntp-offset", NTP_OFFSET, NULL); + + ASSERT_SET_STATE (element, GST_STATE_PLAYING, GST_STATE_CHANGE_SUCCESS); + + /* send intitial events (stream-start and segment) */ + gst_check_setup_events (mysrcpad, element, NULL, GST_FORMAT_TIME); + fail_unless_equals_int (g_list_length (myreceivedevents), 2); + check_and_clear_events (2, NO_COMPARE); + + /* events received while no buffer is cached should be forwarded */ + create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB); + check_and_clear_events (1, COMPARE); + + /* create and push the first buffer, which should be cached */ + buffer = create_rtp_buffer (TIMESTAMP, TRUE, FALSE); + fail_unless_equals_int (gst_pad_push (mysrcpad, buffer), GST_FLOW_OK); + fail_unless_equals_int (g_list_length (buffers), 0); + /* non-serialized events should be forwarded regardless of whether + * there is a cached buffer */ + create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB); + check_and_clear_events (1, COMPARE); + + /* there's still a buffer cached, push a serialized event and make sure + * it's queued */ + create_and_push_event (GST_EVENT_CUSTOM_DOWNSTREAM); + fail_unless_equals_int (g_list_length (myreceivedevents), 0); + /* non-serialized events should be forwarded regardless of whether there + * are serialized events queued, thus the g_list_prepend below */ + event = create_event (GST_EVENT_CUSTOM_DOWNSTREAM_OOB); + mypushedevents = g_list_prepend (mypushedevents, event); + fail_unless (gst_pad_push_event (mysrcpad, event)); + fail_unless_equals_int (g_list_length (myreceivedevents), 1); + + /* when receiving an EOS cached buffer and queued events should be forwarded */ + create_and_push_event (GST_EVENT_EOS); + fail_unless_equals_int (g_list_length (buffers), 1); + check_and_clear_events (3, COMPARE); +} + +GST_END_TEST; + +static Suite * +onviftimestamp_suite (void) +{ + Suite *s = suite_create ("onviftimestamp"); + TCase *tc_general, *tc_events; + + tc_general = tcase_create ("apply"); + suite_add_tcase (s, tc_general); + tcase_add_checked_fixture (tc_general, setup, cleanup); + + tcase_add_test (tc_general, test_apply_discont); + tcase_add_test (tc_general, test_apply_not_discont); + tcase_add_test (tc_general, test_apply_clean_point); + tcase_add_test (tc_general, test_apply_no_e_bit); + tcase_add_test (tc_general, test_apply_e_bit); + tcase_add_test (tc_general, test_flushing); + tcase_add_test (tc_general, test_reusable_element_no_e_bit); + tcase_add_test (tc_general, test_reusable_element_e_bit); + + tc_events = tcase_create ("events"); + suite_add_tcase (s, tc_events); + tcase_add_checked_fixture (tc_events, setup_with_event, cleanup_with_event); + + tcase_add_test (tc_events, test_serialized_events); + tcase_add_test (tc_events, test_non_serialized_events); return s; } -- 2.7.4