rtprtxsend: allow generic input caps
authorMathieu Duponchelle <mathieu@centricular.com>
Mon, 27 Jan 2020 22:59:05 +0000 (23:59 +0100)
committerMathieu Duponchelle <mduponchelle1@gmail.com>
Tue, 28 Jan 2020 15:44:13 +0000 (15:44 +0000)
When connected to an upstream rtpfunnel element, payload-type,
ssrc and clock-rate will not be present in the received caps.

rtprtxsend can already deal with only the clock rate being
present there, a new property is exposed to allow users to
provide a payload-type -> clock-rate map, this enables the
use of the max-size-time property for bundled streams.

gst/rtpmanager/gstrtprtxsend.c
gst/rtpmanager/gstrtprtxsend.h
tests/check/elements/rtprtx.c

index 3303ca5..bfa5db8 100644 (file)
@@ -62,7 +62,8 @@ enum
   PROP_MAX_SIZE_TIME,
   PROP_MAX_SIZE_PACKETS,
   PROP_NUM_RTX_REQUESTS,
-  PROP_NUM_RTX_PACKETS
+  PROP_NUM_RTX_PACKETS,
+  PROP_CLOCK_RATE_MAP,
 };
 
 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
@@ -74,7 +75,7 @@ static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK,
     GST_PAD_ALWAYS,
-    GST_STATIC_CAPS ("application/x-rtp, " "clock-rate = (int) [1, MAX]")
+    GST_STATIC_CAPS ("application/x-rtp")
     );
 
 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
@@ -192,6 +193,11 @@ gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
           " Number of retransmission packets sent", 0, G_MAXUINT,
           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_CLOCK_RATE_MAP,
+      g_param_spec_boxed ("clock-rate-map", "Clock Rate Map",
+          "Map of payload types to their clock rates",
+          GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
 
@@ -228,6 +234,9 @@ gst_rtp_rtx_send_finalize (GObject * object)
   g_hash_table_unref (rtx->rtx_pt_map);
   if (rtx->rtx_pt_map_structure)
     gst_structure_free (rtx->rtx_pt_map_structure);
+  g_hash_table_unref (rtx->clock_rate_map);
+  if (rtx->clock_rate_map_structure)
+    gst_structure_free (rtx->clock_rate_map_structure);
   g_object_unref (rtx->queue);
 
   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
@@ -268,6 +277,7 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
       NULL, (GDestroyNotify) ssrc_rtx_data_free);
   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
+  rtx->clock_rate_map = g_hash_table_new (g_direct_hash, g_direct_equal);
 
   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
@@ -625,6 +635,9 @@ gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (!gst_structure_get_int (s, "payload", &payload))
         payload = -1;
 
+      if (payload == -1 || ssrc == G_MAXUINT)
+        break;
+
       if (payload == -1)
         GST_WARNING_OBJECT (rtx, "No payload in caps");
 
@@ -723,6 +736,12 @@ process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
   if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
     data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
 
+    if (data->clock_rate == 0 && rtx->clock_rate_map_structure) {
+      data->clock_rate =
+          GPOINTER_TO_INT (g_hash_table_lookup (rtx->clock_rate_map,
+              GUINT_TO_POINTER (payload_type)));
+    }
+
     /* add current rtp buffer to queue history */
     item = g_slice_new0 (BufferQueueItem);
     item->seqnum = seqnum;
@@ -871,6 +890,11 @@ gst_rtp_rtx_send_get_property (GObject * object,
       g_value_set_uint (value, rtx->num_rtx_packets);
       GST_OBJECT_UNLOCK (rtx);
       break;
+    case PROP_CLOCK_RATE_MAP:
+      GST_OBJECT_LOCK (rtx);
+      g_value_set_boxed (value, rtx->clock_rate_map_structure);
+      GST_OBJECT_UNLOCK (rtx);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -927,6 +951,16 @@ gst_rtp_rtx_send_set_property (GObject * object,
       rtx->max_size_packets = g_value_get_uint (value);
       GST_OBJECT_UNLOCK (rtx);
       break;
+    case PROP_CLOCK_RATE_MAP:
+      GST_OBJECT_LOCK (rtx);
+      if (rtx->clock_rate_map_structure)
+        gst_structure_free (rtx->clock_rate_map_structure);
+      rtx->clock_rate_map_structure = g_value_dup_boxed (value);
+      g_hash_table_remove_all (rtx->clock_rate_map);
+      gst_structure_foreach (rtx->clock_rate_map_structure,
+          structure_to_hash_table, rtx->clock_rate_map);
+      GST_OBJECT_UNLOCK (rtx);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index 5f084e8..b25a511 100644 (file)
@@ -62,6 +62,11 @@ struct _GstRtpRtxSend
   /* orig pt (string) -> rtx pt (uint) */
   GstStructure *rtx_pt_map_structure;
 
+  /* orig pt (uint) -> clock rate (uint) */
+  GHashTable *clock_rate_map;
+  /* orig pt (string) -> clock rate (uint) */
+  GstStructure *clock_rate_map_structure;
+
   /* buffering control properties */
   guint max_size_time;
   guint max_size_packets;
index c021f29..b43c808 100644 (file)
@@ -652,6 +652,70 @@ GST_START_TEST (test_rtxqueue_max_size_time)
 
 GST_END_TEST;
 
+/* In this test, we verify the behaviour of rtprtxsend when
+ * generic caps are provided to its sink pad, this is useful
+ * when connected to an rtp funnel.
+ */
+GST_START_TEST (test_rtxsender_clock_rate_map)
+{
+  GstBuffer *inbuf, *outbuf;
+  guint master_ssrc = 1234567;
+  guint master_pt = 96;
+  guint rtx_pt = 99;
+  guint master_clock_rate = 90000;
+  GstStructure *pt_map;
+  GstStructure *clock_rate_map;
+  GstHarness *hsend = gst_harness_new ("rtprtxsend");
+
+  pt_map = gst_structure_new ("application/x-rtp-pt-map",
+      "96", G_TYPE_UINT, rtx_pt, NULL);
+  clock_rate_map = gst_structure_new ("application/x-rtp-clock-rate-map",
+      "96", G_TYPE_UINT, master_clock_rate, NULL);
+  g_object_set (hsend->element, "payload-type-map", pt_map,
+      "clock-rate-map", clock_rate_map, "max-size-time", 1000, NULL);
+  gst_structure_free (pt_map);
+  gst_structure_free (clock_rate_map);
+
+  gst_harness_set_src_caps_str (hsend, "application/x-rtp");
+
+  inbuf = create_rtp_buffer (master_ssrc, master_pt, 100);
+  gst_harness_push (hsend, inbuf);
+
+  outbuf = gst_harness_pull (hsend);
+  fail_unless (outbuf == inbuf);
+  gst_buffer_unref (outbuf);
+
+  gst_harness_push_upstream_event (hsend, create_rtx_event (master_ssrc,
+          master_pt, 100));
+
+  outbuf = gst_harness_pull (hsend);
+  fail_unless (outbuf);
+  gst_buffer_unref (outbuf);
+
+  fail_unless_equals_int (gst_harness_buffers_in_queue (hsend), 0);
+
+  /* Thanks to the provided clock rate, rtprtxsend should be able to
+   * determine that the previously pushed buffer should be cleared from
+   * its rtx queue */
+  inbuf = create_rtp_buffer (master_ssrc, master_pt, 131);
+  gst_harness_push (hsend, inbuf);
+
+  outbuf = gst_harness_pull (hsend);
+  fail_unless (outbuf == inbuf);
+  gst_buffer_unref (outbuf);
+
+  fail_unless_equals_int (gst_harness_buffers_in_queue (hsend), 0);
+
+  gst_harness_push_upstream_event (hsend, create_rtx_event (master_ssrc,
+          master_pt, 100));
+
+  fail_unless_equals_int (gst_harness_buffers_in_queue (hsend), 0);
+
+  gst_harness_teardown (hsend);
+}
+
+GST_END_TEST;
+
 static Suite *
 rtprtx_suite (void)
 {
@@ -669,6 +733,7 @@ rtprtx_suite (void)
   tcase_add_test (tc_chain, test_rtxsender_max_size_time);
   tcase_add_test (tc_chain, test_rtxqueue_max_size_packets);
   tcase_add_test (tc_chain, test_rtxqueue_max_size_time);
+  tcase_add_test (tc_chain, test_rtxsender_clock_rate_map);
 
   return s;
 }