rtpbasedepayload: Add max-reorder property
authorStian Selnes <stian@pexip.com>
Thu, 13 Jun 2019 08:36:05 +0000 (10:36 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Thu, 13 Jun 2019 16:41:11 +0000 (19:41 +0300)
Add max-reorder property to make the old hard coded reordering limit of
100 configurable. It's particularly useful in some scenarios to set
max-reorder=0 to disable the behavior that the depayloader will drop
packets.

Note that although the default value is 100, the default limit has
increased with one because of the changed if-test. This was done to
allow the max-reorder value to be more intuitive. See tests.

gst-libs/gst/rtp/gstrtpbasedepayload.c
tests/check/libs/rtpbasedepayload.c

index ddc913c..f577e22 100644 (file)
@@ -52,6 +52,7 @@ struct _GstRTPBaseDepayloadPrivate
   guint32 last_seqnum;
   guint32 last_rtptime;
   guint32 next_seqnum;
+  gint max_reorder;
 
   gboolean negotiated;
 
@@ -71,12 +72,14 @@ enum
 };
 
 #define DEFAULT_SOURCE_INFO FALSE
+#define DEFAULT_MAX_REORDER 100
 
 enum
 {
   PROP_0,
   PROP_STATS,
   PROP_SOURCE_INFO,
+  PROP_MAX_REORDER,
   PROP_LAST
 };
 
@@ -202,6 +205,21 @@ gst_rtp_base_depayload_class_init (GstRTPBaseDepayloadClass * klass)
           "Add RTP source information as buffer meta",
           DEFAULT_SOURCE_INFO, G_PARAM_READWRITE));
 
+  /**
+   * GstRTPBaseDepayload:max-reorder:
+   *
+   * Max seqnum reorder before the sender is assumed to have restarted.
+   *
+   * When max-reorder is set to 0 all reordered/duplicate packets are
+   * considered coming from a restarted sender.
+   *
+   * Since: 1.18
+   **/
+  g_object_class_install_property (gobject_class, PROP_MAX_REORDER,
+      g_param_spec_int ("max-reorder", "Max Reorder",
+          "Max seqnum reorder before assuming sender has restarted",
+          0, G_MAXINT, DEFAULT_MAX_REORDER, G_PARAM_READWRITE));
+
   gstelement_class->change_state = gst_rtp_base_depayload_change_state;
 
   klass->packet_lost = gst_rtp_base_depayload_packet_lost;
@@ -251,6 +269,7 @@ gst_rtp_base_depayload_init (GstRTPBaseDepayload * filter,
   priv->pts = -1;
   priv->duration = -1;
   priv->source_info = DEFAULT_SOURCE_INFO;
+  priv->max_reorder = DEFAULT_MAX_REORDER;
 
   gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
 }
@@ -417,15 +436,19 @@ gst_rtp_base_depayload_handle_buffer (GstRTPBaseDepayload * filter,
           GST_LOG_OBJECT (filter, "%d missing packets", gap);
           discont = TRUE;
         } else {
-          /* seqnum < next_seqnum, we have seen this packet before or the sender
-           * could be restarted. If the packet is not too old, we throw it away as
-           * a duplicate, otherwise we mark discont and continue. 100 misordered
-           * packets is a good threshold. See also RFC 4737. */
-          if (gap < 100)
+          /* seqnum < next_seqnum, we have seen this packet before, have a
+           * reordered packet or the sender could be restarted. If the packet
+           * is not too old, we throw it away as a duplicate. Otherwise we
+           * mark discont and continue assuming the sender has restarted. See
+           * also RFC 4737. */
+          GST_WARNING ("gap %d <= priv->max_reorder %d -> dropping %d",
+              gap, priv->max_reorder, gap <= priv->max_reorder);
+          if (gap <= priv->max_reorder)
             goto dropping;
 
           GST_LOG_OBJECT (filter,
-              "%d > 100, packet too old, sender likely restarted", gap);
+              "%d > %d, packet too old, sender likely restarted", gap,
+              priv->max_reorder);
           discont = TRUE;
         }
       }
@@ -510,7 +533,8 @@ invalid_buffer:
 dropping:
   {
     gst_rtp_buffer_unmap (&rtp);
-    GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
+    GST_WARNING_OBJECT (filter, "%d <= %d, dropping old packet", gap,
+        priv->max_reorder);
     gst_buffer_unref (in);
     return GST_FLOW_OK;
   }
@@ -1037,14 +1061,19 @@ gst_rtp_base_depayload_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec)
 {
   GstRTPBaseDepayload *depayload;
+  GstRTPBaseDepayloadPrivate *priv;
 
   depayload = GST_RTP_BASE_DEPAYLOAD (object);
+  priv = depayload->priv;
 
   switch (prop_id) {
     case PROP_SOURCE_INFO:
       gst_rtp_base_depayload_set_source_info_enabled (depayload,
           g_value_get_boolean (value));
       break;
+    case PROP_MAX_REORDER:
+      priv->max_reorder = g_value_get_int (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -1056,8 +1085,10 @@ gst_rtp_base_depayload_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec)
 {
   GstRTPBaseDepayload *depayload;
+  GstRTPBaseDepayloadPrivate *priv;
 
   depayload = GST_RTP_BASE_DEPAYLOAD (object);
+  priv = depayload->priv;
 
   switch (prop_id) {
     case PROP_STATS:
@@ -1068,6 +1099,9 @@ gst_rtp_base_depayload_get_property (GObject * object, guint prop_id,
       g_value_set_boolean (value,
           gst_rtp_base_depayload_is_source_info_enabled (depayload));
       break;
+    case PROP_MAX_REORDER:
+      g_value_set_int (value, priv->max_reorder);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
index b94273c..cd7decb 100644 (file)
@@ -1412,6 +1412,59 @@ GST_START_TEST (rtp_base_depayload_source_info_from_rtp_only)
 
 GST_END_TEST;
 
+/* Test max-reorder property. Reordered packets with a gap less than
+ * max-reordered will be dropped, reordered packets with gap larger than
+ * max-reorder is considered coming fra a restarted sender and should not be
+ * dropped. */
+GST_START_TEST (rtp_base_depayload_max_reorder)
+{
+  GstHarness *h;
+  GstRtpDummyDepay *depay;
+  guint seq = 1000;
+
+  depay = rtp_dummy_depay_new ();
+  h = gst_harness_new_with_element (GST_ELEMENT_CAST (depay), "sink", "src");
+  gst_harness_set_src_caps_str (h, "application/x-rtp");
+
+#define PUSH_AND_CHECK(seqnum, pushed) G_STMT_START {                   \
+    GstBuffer *buffer = gst_rtp_buffer_new_allocate (0, 0, 0);          \
+    rtp_buffer_set (buffer, "seq", seqnum, "ssrc", 0x11, NULL);         \
+    fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer)); \
+    fail_unless_equals_int (gst_harness_buffers_in_queue (h), pushed);  \
+    if (pushed)                                                         \
+      gst_buffer_unref (gst_harness_pull (h));                          \
+  } G_STMT_END;
+
+  /* By default some reordering is accepted. Old seqnums should be
+   * dropped, but not too old */
+  PUSH_AND_CHECK (seq, TRUE);
+  PUSH_AND_CHECK (seq - 50, FALSE);
+  PUSH_AND_CHECK (seq - 100, TRUE);
+
+  /* Update property to allow less reordering */
+  g_object_set (depay, "max-reorder", 3, NULL);
+
+  /* Gaps up to max allowed reordering is dropped. */
+  PUSH_AND_CHECK (seq, TRUE);
+  PUSH_AND_CHECK (seq - 2, FALSE);
+  PUSH_AND_CHECK (seq - 3, TRUE);
+
+  /* After a push the initial state should be reset, so a duplicate of the
+   * last packet should be dropped */
+  PUSH_AND_CHECK (seq - 3, FALSE);
+
+  /* Update property to minimum value. Should never drop buffers. */
+  g_object_set (depay, "max-reorder", 0, NULL);
+
+  /* Duplicate buffer should now be pushed. */
+  PUSH_AND_CHECK (seq, TRUE);
+  PUSH_AND_CHECK (seq, TRUE);
+
+  g_object_unref (depay);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
 
 static Suite *
 rtp_basepayloading_suite (void)
@@ -1445,6 +1498,7 @@ rtp_basepayloading_suite (void)
 
   tcase_add_test (tc_chain, rtp_base_depayload_source_info_test);
   tcase_add_test (tc_chain, rtp_base_depayload_source_info_from_rtp_only);
+  tcase_add_test (tc_chain, rtp_base_depayload_max_reorder);
 
   return s;
 }