GST_END_TEST;
+static GstBuffer *
+generate_test_buffer (guint seqnum, guint ssrc, guint8 twcc_ext_id)
+{
+ GstBuffer *buf;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
+ buf = gst_rtp_buffer_new_allocate (0, 0, 0);
+ GST_BUFFER_PTS (buf) = seqnum * 20 * GST_MSECOND;
+ GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf);
+
+ gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
+ gst_rtp_buffer_set_payload_type (&rtp, 100);
+ gst_rtp_buffer_set_seq (&rtp, seqnum);
+ gst_rtp_buffer_set_timestamp (&rtp, seqnum * 160);
+ gst_rtp_buffer_set_ssrc (&rtp, ssrc);
+
+ if (twcc_ext_id > 0) {
+ guint16 data;
+ GST_WRITE_UINT16_BE (&data, seqnum);
+ gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id,
+ &data, sizeof (guint16));
+ }
+
+ gst_rtp_buffer_unmap (&rtp);
+
+ return buf;
+}
+
+GST_START_TEST (rtpfunnel_custom_sticky)
+{
+ GstHarness *h, *h0, *h1;
+ GstEvent *event;
+ const GstStructure *s;
+ const gchar *value = NULL;
+
+ h = gst_harness_new_with_padnames ("rtpfunnel", NULL, "src");
+
+ /* request a sinkpad, with some caps */
+ h0 = gst_harness_new_with_element (h->element, "sink_0", NULL);
+ gst_harness_set_src_caps_str (h0, "application/x-rtp, " "ssrc=(uint)123");
+
+ /* request a second sinkpad, also with caps */
+ h1 = gst_harness_new_with_element (h->element, "sink_1", NULL);
+ gst_harness_set_src_caps_str (h1, "application/x-rtp, " "ssrc=(uint)456");
+
+ while ((event = gst_harness_try_pull_event (h)))
+ gst_event_unref (event);
+
+ fail_unless (gst_harness_push_event (h0,
+ gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
+ gst_structure_new ("test", "key", G_TYPE_STRING, "value0",
+ NULL))));
+
+ fail_unless (gst_harness_push_event (h1,
+ gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
+ gst_structure_new ("test", "key", G_TYPE_STRING, "value1",
+ NULL))));
+
+ /* Send a buffer through first pad, expect the event to be the first one */
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h0, generate_test_buffer (500, 123, 0)));
+ for (;;) {
+ event = gst_harness_pull_event (h);
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY)
+ break;
+ gst_event_unref (event);
+ }
+ s = gst_event_get_structure (event);
+ fail_unless (s);
+ fail_unless (gst_structure_has_name (s, "test"));
+ value = gst_structure_get_string (s, "key");
+ fail_unless_equals_string (value, "value0");
+ gst_event_unref (event);
+ gst_buffer_unref (gst_harness_pull (h));
+
+ /* Send a buffer through second pad, expect the event to be the second one
+ */
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h1, generate_test_buffer (500, 123, 0)));
+ for (;;) {
+ event = gst_harness_pull_event (h);
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY)
+ break;
+ gst_event_unref (event);
+ }
+ s = gst_event_get_structure (event);
+ fail_unless (s);
+ fail_unless (gst_structure_has_name (s, "test"));
+ value = gst_structure_get_string (s, "key");
+ fail_unless_equals_string (value, "value1");
+ gst_event_unref (event);
+ gst_buffer_unref (gst_harness_pull (h));
+
+ /* Send a buffer through first pad, expect the event to again be the first
+ * one
+ */
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h0, generate_test_buffer (500, 123, 5)));
+ for (;;) {
+ event = gst_harness_pull_event (h);
+ if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY)
+ break;
+ gst_event_unref (event);
+ }
+ s = gst_event_get_structure (event);
+ fail_unless (s);
+ fail_unless (gst_structure_has_name (s, "test"));
+ value = gst_structure_get_string (s, "key");
+ fail_unless_equals_string (value, "value0");
+ gst_event_unref (event);
+ gst_buffer_unref (gst_harness_pull (h));
+
+ gst_harness_teardown (h);
+ gst_harness_teardown (h0);
+ gst_harness_teardown (h1);
+}
+
+GST_END_TEST;
+
GST_START_TEST (rtpfunnel_stress)
{
GstHarness *h = gst_harness_new_with_padnames ("rtpfunnel",
GST_END_TEST;
-static GstBuffer *
-generate_test_buffer (guint seqnum, guint ssrc, guint8 twcc_ext_id)
-{
- GstBuffer *buf;
- GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
-
- buf = gst_rtp_buffer_new_allocate (0, 0, 0);
- GST_BUFFER_PTS (buf) = seqnum * 20 * GST_MSECOND;
- GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf);
-
- gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
- gst_rtp_buffer_set_payload_type (&rtp, 100);
- gst_rtp_buffer_set_seq (&rtp, seqnum);
- gst_rtp_buffer_set_timestamp (&rtp, seqnum * 160);
- gst_rtp_buffer_set_ssrc (&rtp, ssrc);
-
- if (twcc_ext_id > 0) {
- guint16 data;
- GST_WRITE_UINT16_BE (&data, seqnum);
- gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id,
- &data, sizeof (guint16));
- }
-
- gst_rtp_buffer_unmap (&rtp);
-
- return buf;
-}
-
static gint32
get_twcc_seqnum (GstBuffer * buf, guint8 ext_id)
{
tcase_add_test (tc_chain, rtpfunnel_ssrc_demuxing);
tcase_add_test (tc_chain, rtpfunnel_ssrc_downstream_not_leaking_through);
tcase_add_test (tc_chain, rtpfunnel_common_ts_offset);
+ tcase_add_test (tc_chain, rtpfunnel_custom_sticky);
tcase_add_test (tc_chain, rtpfunnel_stress);