rtpfunnel: Also forward custom sticky event
authorOlivier Crête <olivier.crete@collabora.com>
Wed, 8 Jul 2020 21:28:31 +0000 (17:28 -0400)
committerOlivier Crête <olivier.crete@ocrete.ca>
Tue, 6 Oct 2020 20:57:49 +0000 (20:57 +0000)
This is useful to track metadata about each group of packets

Also include a unit test

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/666>

gst/rtpmanager/gstrtpfunnel.c
tests/check/elements/rtpfunnel.c

index 7638b34..6bd86b0 100644 (file)
@@ -189,18 +189,28 @@ done:
 static void
 gst_rtp_funnel_forward_segment (GstRtpFunnel * funnel, GstPad * pad)
 {
-  GstEvent *segment;
+  GstEvent *event;
+  guint i;
 
   if (pad == funnel->current_pad) {
     goto done;
   }
 
-  segment = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
-  if (segment && !gst_pad_push_event (funnel->srcpad, segment)) {
+  event = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0);
+  if (event && !gst_pad_push_event (funnel->srcpad, event)) {
     GST_ERROR_OBJECT (funnel, "Could not push segment");
     goto done;
   }
 
+  for (i = 0;; i++) {
+    event = gst_pad_get_sticky_event (pad, GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
+        i);
+    if (event == NULL)
+      break;
+    if (!gst_pad_push_event (funnel->srcpad, event))
+      GST_ERROR_OBJECT (funnel, "Could not push custom event");
+  }
+
   funnel->current_pad = pad;
 
 done:
index 5224dce..1b22f7c 100644 (file)
@@ -127,6 +127,125 @@ GST_START_TEST (rtpfunnel_common_ts_offset)
 
 GST_END_TEST;
 
+static GstBuffer *
+generate_test_buffer (guint seqnum, guint ssrc, guint8 twcc_ext_id)
+{
+  GstBuffer *buf;
+  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
+  buf = gst_rtp_buffer_new_allocate (0, 0, 0);
+  GST_BUFFER_PTS (buf) = seqnum * 20 * GST_MSECOND;
+  GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf);
+
+  gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
+  gst_rtp_buffer_set_payload_type (&rtp, 100);
+  gst_rtp_buffer_set_seq (&rtp, seqnum);
+  gst_rtp_buffer_set_timestamp (&rtp, seqnum * 160);
+  gst_rtp_buffer_set_ssrc (&rtp, ssrc);
+
+  if (twcc_ext_id > 0) {
+    guint16 data;
+    GST_WRITE_UINT16_BE (&data, seqnum);
+    gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id,
+        &data, sizeof (guint16));
+  }
+
+  gst_rtp_buffer_unmap (&rtp);
+
+  return buf;
+}
+
+GST_START_TEST (rtpfunnel_custom_sticky)
+{
+  GstHarness *h, *h0, *h1;
+  GstEvent *event;
+  const GstStructure *s;
+  const gchar *value = NULL;
+
+  h = gst_harness_new_with_padnames ("rtpfunnel", NULL, "src");
+
+  /* request a sinkpad, with some caps */
+  h0 = gst_harness_new_with_element (h->element, "sink_0", NULL);
+  gst_harness_set_src_caps_str (h0, "application/x-rtp, " "ssrc=(uint)123");
+
+  /* request a second sinkpad, also with caps */
+  h1 = gst_harness_new_with_element (h->element, "sink_1", NULL);
+  gst_harness_set_src_caps_str (h1, "application/x-rtp, " "ssrc=(uint)456");
+
+  while ((event = gst_harness_try_pull_event (h)))
+    gst_event_unref (event);
+
+  fail_unless (gst_harness_push_event (h0,
+          gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
+              gst_structure_new ("test", "key", G_TYPE_STRING, "value0",
+                  NULL))));
+
+  fail_unless (gst_harness_push_event (h1,
+          gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
+              gst_structure_new ("test", "key", G_TYPE_STRING, "value1",
+                  NULL))));
+
+  /* Send a buffer through first pad, expect the event to be the first one */
+  fail_unless_equals_int (GST_FLOW_OK,
+      gst_harness_push (h0, generate_test_buffer (500, 123, 0)));
+  for (;;) {
+    event = gst_harness_pull_event (h);
+    if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY)
+      break;
+    gst_event_unref (event);
+  }
+  s = gst_event_get_structure (event);
+  fail_unless (s);
+  fail_unless (gst_structure_has_name (s, "test"));
+  value = gst_structure_get_string (s, "key");
+  fail_unless_equals_string (value, "value0");
+  gst_event_unref (event);
+  gst_buffer_unref (gst_harness_pull (h));
+
+  /* Send a buffer through second pad, expect the event to be the second one
+   */
+  fail_unless_equals_int (GST_FLOW_OK,
+      gst_harness_push (h1, generate_test_buffer (500, 123, 0)));
+  for (;;) {
+    event = gst_harness_pull_event (h);
+    if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY)
+      break;
+    gst_event_unref (event);
+  }
+  s = gst_event_get_structure (event);
+  fail_unless (s);
+  fail_unless (gst_structure_has_name (s, "test"));
+  value = gst_structure_get_string (s, "key");
+  fail_unless_equals_string (value, "value1");
+  gst_event_unref (event);
+  gst_buffer_unref (gst_harness_pull (h));
+
+  /* Send a buffer through first pad, expect the event to again be the first
+   * one
+   */
+  fail_unless_equals_int (GST_FLOW_OK,
+      gst_harness_push (h0, generate_test_buffer (500, 123, 5)));
+  for (;;) {
+    event = gst_harness_pull_event (h);
+    if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_DOWNSTREAM_STICKY)
+      break;
+    gst_event_unref (event);
+  }
+  s = gst_event_get_structure (event);
+  fail_unless (s);
+  fail_unless (gst_structure_has_name (s, "test"));
+  value = gst_structure_get_string (s, "key");
+  fail_unless_equals_string (value, "value0");
+  gst_event_unref (event);
+  gst_buffer_unref (gst_harness_pull (h));
+
+  gst_harness_teardown (h);
+  gst_harness_teardown (h0);
+  gst_harness_teardown (h1);
+}
+
+GST_END_TEST;
+
 GST_START_TEST (rtpfunnel_stress)
 {
   GstHarness *h = gst_harness_new_with_padnames ("rtpfunnel",
@@ -224,34 +343,6 @@ GST_START_TEST (rtpfunnel_twcc_caps)
 
 GST_END_TEST;
 
-static GstBuffer *
-generate_test_buffer (guint seqnum, guint ssrc, guint8 twcc_ext_id)
-{
-  GstBuffer *buf;
-  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
-
-  buf = gst_rtp_buffer_new_allocate (0, 0, 0);
-  GST_BUFFER_PTS (buf) = seqnum * 20 * GST_MSECOND;
-  GST_BUFFER_DTS (buf) = GST_BUFFER_PTS (buf);
-
-  gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
-  gst_rtp_buffer_set_payload_type (&rtp, 100);
-  gst_rtp_buffer_set_seq (&rtp, seqnum);
-  gst_rtp_buffer_set_timestamp (&rtp, seqnum * 160);
-  gst_rtp_buffer_set_ssrc (&rtp, ssrc);
-
-  if (twcc_ext_id > 0) {
-    guint16 data;
-    GST_WRITE_UINT16_BE (&data, seqnum);
-    gst_rtp_buffer_add_extension_onebyte_header (&rtp, twcc_ext_id,
-        &data, sizeof (guint16));
-  }
-
-  gst_rtp_buffer_unmap (&rtp);
-
-  return buf;
-}
-
 static gint32
 get_twcc_seqnum (GstBuffer * buf, guint8 ext_id)
 {
@@ -408,6 +499,7 @@ rtpfunnel_suite (void)
   tcase_add_test (tc_chain, rtpfunnel_ssrc_demuxing);
   tcase_add_test (tc_chain, rtpfunnel_ssrc_downstream_not_leaking_through);
   tcase_add_test (tc_chain, rtpfunnel_common_ts_offset);
+  tcase_add_test (tc_chain, rtpfunnel_custom_sticky);
 
   tcase_add_test (tc_chain, rtpfunnel_stress);