rtprtxsend: don't start the task unless we are doing rtx
authorHavard Graff <havard.graff@gmail.com>
Fri, 4 Nov 2016 10:47:20 +0000 (11:47 +0100)
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Tue, 15 Mar 2022 12:03:27 +0000 (12:03 +0000)
The rtxsend element can do pass-through when not enabled (no pt-map set)
and in those cases there is no point in starting an additional task
that does absolutely nothing.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1880>

subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c
subprojects/gst-plugins-good/tests/check/elements/rtprtx.c

index e873d45..4883ee7 100644 (file)
@@ -109,6 +109,8 @@ G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT,
 GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE,
     GST_TYPE_RTP_RTX_SEND);
 
+#define IS_RTX_ENABLED(rtx) (g_hash_table_size ((rtx)->rtx_pt_map) > 0)
+
 typedef struct
 {
   guint16 seqnum;
@@ -152,6 +154,60 @@ ssrc_rtx_data_free (SSRCRtxData * data)
   g_slice_free (SSRCRtxData, data);
 }
 
+typedef enum
+{
+  RTX_TASK_START,
+  RTX_TASK_PAUSE,
+  RTX_TASK_STOP,
+} RtxTaskState;
+
+static void
+gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
+{
+  GST_OBJECT_LOCK (rtx);
+  gst_data_queue_set_flushing (rtx->queue, flush);
+  gst_data_queue_flush (rtx->queue);
+  GST_OBJECT_UNLOCK (rtx);
+}
+
+static gboolean
+gst_rtp_rtx_send_set_task_state (GstRtpRtxSend * rtx, RtxTaskState task_state)
+{
+  GstTask *task = GST_PAD_TASK (rtx->srcpad);
+  GstPadMode mode = GST_PAD_MODE (rtx->srcpad);
+  gboolean ret = TRUE;
+
+  switch (task_state) {
+    case RTX_TASK_START:
+    {
+      gboolean active = task && GST_TASK_STATE (task) == GST_TASK_STARTED;
+      if (IS_RTX_ENABLED (rtx) && mode != GST_PAD_MODE_NONE && !active) {
+        GST_DEBUG_OBJECT (rtx, "Starting RTX task");
+        gst_rtp_rtx_send_set_flushing (rtx, FALSE);
+        ret = gst_pad_start_task (rtx->srcpad,
+            (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+      }
+      break;
+    }
+    case RTX_TASK_PAUSE:
+      if (task) {
+        GST_DEBUG_OBJECT (rtx, "Pausing RTX task");
+        gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+        ret = gst_pad_pause_task (rtx->srcpad);
+      }
+      break;
+    case RTX_TASK_STOP:
+      if (task) {
+        GST_DEBUG_OBJECT (rtx, "Stopping RTX task");
+        gst_rtp_rtx_send_set_flushing (rtx, TRUE);
+        ret = gst_pad_stop_task (rtx->srcpad);
+      }
+      break;
+  }
+
+  return ret;
+}
+
 static void
 gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
 {
@@ -287,15 +343,6 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
 }
 
-static void
-gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
-{
-  GST_OBJECT_LOCK (rtx);
-  gst_data_queue_set_flushing (rtx->queue, flush);
-  gst_data_queue_flush (rtx->queue);
-  GST_OBJECT_UNLOCK (rtx);
-}
-
 static gboolean
 gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
     guint visible, guint bytes, guint64 time, gpointer checkdata)
@@ -609,14 +656,11 @@ gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
       gst_pad_push_event (rtx->srcpad, event);
-      gst_rtp_rtx_send_set_flushing (rtx, TRUE);
-      gst_pad_pause_task (rtx->srcpad);
+      gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
       return TRUE;
     case GST_EVENT_FLUSH_STOP:
       gst_pad_push_event (rtx->srcpad, event);
-      gst_rtp_rtx_send_set_flushing (rtx, FALSE);
-      gst_pad_start_task (rtx->srcpad,
-          (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+      gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
       return TRUE;
     case GST_EVENT_EOS:
       GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
@@ -837,7 +881,7 @@ gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
     data->destroy (data);
   } else {
     GST_LOG_OBJECT (rtx, "flushing");
-    gst_pad_pause_task (rtx->srcpad);
+    gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_PAUSE);
   }
 }
 
@@ -851,12 +895,9 @@ gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
   switch (mode) {
     case GST_PAD_MODE_PUSH:
       if (active) {
-        gst_rtp_rtx_send_set_flushing (rtx, FALSE);
-        ret = gst_pad_start_task (rtx->srcpad,
-            (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
+        ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
       } else {
-        gst_rtp_rtx_send_set_flushing (rtx, TRUE);
-        ret = gst_pad_stop_task (rtx->srcpad);
+        ret = gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
       }
       GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
       break;
@@ -948,6 +989,12 @@ gst_rtp_rtx_send_set_property (GObject * object,
       gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
           rtx->rtx_pt_map);
       GST_OBJECT_UNLOCK (rtx);
+
+      if (IS_RTX_ENABLED (rtx))
+        gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_START);
+      else
+        gst_rtp_rtx_send_set_task_state (rtx, RTX_TASK_STOP);
+
       break;
     case PROP_MAX_SIZE_TIME:
       GST_OBJECT_LOCK (rtx);
index 2e4c651..64f4383 100644 (file)
@@ -146,6 +146,141 @@ create_rtp_buffer_with_timestamp (guint32 ssrc, guint8 payload_type,
   return ret;
 }
 
+static GstStructure *
+create_rtx_map (const gchar * name, guint key, guint value)
+{
+  gchar *key_str = g_strdup_printf ("%u", key);
+  GstStructure *s = gst_structure_new (name,
+      key_str, G_TYPE_UINT, value, NULL);
+  g_free (key_str);
+  return s;
+}
+
+GST_START_TEST (test_rtxsend_basic)
+{
+  const guint32 main_ssrc = 1234567;
+  const guint main_pt = 96;
+  const guint32 rtx_ssrc = 7654321;
+  const guint rtx_pt = 106;
+
+  GstHarness *h = gst_harness_new ("rtprtxsend");
+  GstStructure *ssrc_map =
+      create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc);
+  GstStructure *pt_map =
+      create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt);
+
+  g_object_set (h->element, "ssrc-map", ssrc_map, NULL);
+  g_object_set (h->element, "payload-type-map", pt_map, NULL);
+
+  gst_harness_set_src_caps_str (h, "application/x-rtp, "
+      "media = (string)video, payload = (int)96, "
+      "ssrc = (uint)1234567, clock-rate = (int)90000, "
+      "encoding-name = (string)RAW");
+
+  /* push a packet */
+  fail_unless_equals_int (GST_FLOW_OK,
+      gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0)));
+
+  /* and check it came through */
+  pull_and_verify (h, FALSE, main_ssrc, main_pt, 0);
+
+  /* now request this packet as rtx */
+  gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0));
+
+  /* and verify we got an rtx-packet for it */
+  pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 0);
+
+  gst_structure_free (ssrc_map);
+  gst_structure_free (pt_map);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_rtxsend_disabled_enabled_disabled)
+{
+  const guint32 main_ssrc = 1234567;
+  const guint main_pt = 96;
+  const guint32 rtx_ssrc = 7654321;
+  const guint rtx_pt = 106;
+
+  GstHarness *h = gst_harness_new ("rtprtxsend");
+  GstStructure *ssrc_map =
+      create_rtx_map ("application/x-rtp-ssrc-map", main_ssrc, rtx_ssrc);
+  GstStructure *pt_map =
+      create_rtx_map ("application/x-rtp-pt-map", main_pt, rtx_pt);
+  GstStructure *empty_pt_map =
+      gst_structure_new_empty ("application/x-rtp-pt-map");
+
+  /* set ssrc-map, but not pt-map, making the element work in passthrough */
+  g_object_set (h->element, "ssrc-map", ssrc_map, NULL);
+
+  gst_harness_set_src_caps_str (h, "application/x-rtp, "
+      "media = (string)video, payload = (int)96, "
+      "ssrc = (uint)1234567, clock-rate = (int)90000, "
+      "encoding-name = (string)RAW");
+
+  /* push, pull, request-rtx, verify nothing arrives */
+  fail_unless_equals_int (GST_FLOW_OK,
+      gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 0)));
+  pull_and_verify (h, FALSE, main_ssrc, main_pt, 0);
+  gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 0));
+  fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+  /* verify there is no task on the rtxsend srcpad */
+  fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL);
+
+  /* now enable rtx by setting the pt-map */
+  g_object_set (h->element, "payload-type-map", pt_map, NULL);
+
+  /* push, pull, request rtx, pull rtx */
+  fail_unless_equals_int (GST_FLOW_OK,
+      gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 1)));
+  pull_and_verify (h, FALSE, main_ssrc, main_pt, 1);
+  gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 1));
+  pull_and_verify (h, TRUE, rtx_ssrc, rtx_pt, 1);
+  /* verify there is a task on the rtxsend srcpad */
+  fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) != NULL);
+
+  /* now enable disable rtx agian by setting an empty pt-map */
+  g_object_set (h->element, "payload-type-map", empty_pt_map, NULL);
+
+  /* push, pull, request-rtx, verify nothing arrives */
+  fail_unless_equals_int (GST_FLOW_OK,
+      gst_harness_push (h, create_rtp_buffer (main_ssrc, main_pt, 2)));
+  pull_and_verify (h, FALSE, main_ssrc, main_pt, 2);
+  gst_harness_push_upstream_event (h, create_rtx_event (main_ssrc, main_pt, 2));
+  fail_unless_equals_int (0, gst_harness_buffers_in_queue (h));
+  /* verify the task is gone again */
+  fail_unless (GST_PAD_TASK (GST_PAD_PEER (h->sinkpad)) == NULL);
+
+  gst_structure_free (ssrc_map);
+  gst_structure_free (pt_map);
+  gst_structure_free (empty_pt_map);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_rtxsend_configured_not_playing_cleans_up)
+{
+  GstElement *rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
+  GstStructure *ssrc_map =
+      create_rtx_map ("application/x-rtp-ssrc-map", 123, 96);
+  GstStructure *pt_map = create_rtx_map ("application/x-rtp-pt-map", 321, 106);
+
+  g_object_set (rtxsend, "ssrc-map", ssrc_map, NULL);
+  g_object_set (rtxsend, "payload-type-map", pt_map, NULL);
+  gst_structure_free (ssrc_map);
+  gst_structure_free (pt_map);
+
+  g_usleep (G_USEC_PER_SEC);
+
+  gst_object_unref (rtxsend);
+}
+
+GST_END_TEST;
+
+
 GST_START_TEST (test_rtxreceive_empty_rtx_packet)
 {
   guint rtx_ssrc = 7654321;
@@ -788,6 +923,10 @@ rtprtx_suite (void)
 
   suite_add_tcase (s, tc_chain);
 
+  tcase_add_test (tc_chain, test_rtxsend_basic);
+  tcase_add_test (tc_chain, test_rtxsend_disabled_enabled_disabled);
+  tcase_add_test (tc_chain, test_rtxsend_configured_not_playing_cleans_up);
+
   tcase_add_test (tc_chain, test_rtxreceive_empty_rtx_packet);
   tcase_add_test (tc_chain, test_rtxsend_rtxreceive);
   tcase_add_test (tc_chain, test_rtxsend_rtxreceive_with_packet_loss);