rtpjitterbuffer: fix stalling when resetting timers
authorHavard Graff <havard@pexip.com>
Wed, 4 Mar 2020 10:17:16 +0000 (11:17 +0100)
committerHavard Graff <havard@pexip.com>
Wed, 4 Mar 2020 11:55:52 +0000 (12:55 +0100)
When calling gst_rtp_jitter_buffer_reset you pass in a seqnum.

This is considered the starting-point for a new stream.

However, the old behavior would unref this buffer, basically lying to
the thread that is pushing out buffers saying that it can expect
this buffer, when it would never arrive. The resulting effect being no
more buffer pushed out of the jitterbuffer, and it would buffer
incoming data indefinitely.

By instead inserting the buffer in the gap_packets queue, the _reset()
function will take responsibility for using that as the first buffer
of the new stream.

Fixes #703

gst/rtpmanager/gstrtpjitterbuffer.c
tests/check/elements/rtpjitterbuffer.c

index 770caa9..117d9e4 100644 (file)
@@ -3013,7 +3013,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
        * next packet */
       GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
           rtp_timer_queue_length (priv->timers), max_dropout);
-      gst_buffer_unref (buffer);
+      g_queue_insert_sorted (&priv->gap_packets, buffer,
+          (GCompareDataFunc) compare_buffer_seqnum, NULL);
       return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
     }
 
index 92d96d1..3741eac 100644 (file)
@@ -532,6 +532,14 @@ push_test_buffer (GstHarness * h, guint seq_num)
           generate_test_buffer (seq_num)));
 }
 
+static void
+push_test_buffer_now (GstHarness * h, guint seqnum, guint32 rtptime)
+{
+  GstClockTime now = gst_clock_get_time (GST_ELEMENT_CLOCK (h->element));
+  fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h,
+          generate_test_buffer_full (now, seqnum, rtptime)));
+}
+
 static gint
 get_rtp_seq_num (GstBuffer * buf)
 {
@@ -3340,6 +3348,58 @@ GST_START_TEST (test_drop_messages_interval)
 
 GST_END_TEST;
 
+GST_START_TEST (test_reset_does_not_stall)
+{
+  GstHarness *h = gst_harness_new ("rtpjitterbuffer");
+  gint latency_ms = 100;
+  guint inital_bufs = latency_ms / TEST_BUF_MS;
+  guint max_dropout_time = 100;
+  guint16 i;
+  guint16 seqnum = 0;
+  guint32 rtptime = 0;
+
+  gst_harness_use_systemclock (h);
+  gst_harness_set_src_caps (h, generate_caps ());
+
+  g_object_set (h->element, "latency", latency_ms, "do-retransmission", TRUE,
+      "do-lost", TRUE, "rtx-max-retries", 2,
+      "max-dropout-time", max_dropout_time, NULL);
+
+  /* push initial 5 buffers and pull them out as well */
+  for (i = 0; i < inital_bufs; i++) {
+    seqnum += 1;
+    rtptime += TEST_BUF_DURATION;
+    push_test_buffer_now (h, seqnum, rtptime);
+    g_usleep (G_USEC_PER_SEC / 1000 * 20);
+  }
+  for (i = 0; i < inital_bufs; i++) {
+    gst_buffer_unref (gst_harness_pull (h));
+  }
+
+  /* a big burst of buffers, with increasing gap size, but same rtptime
+     (typical I-frame), hoping to trigger the internal reset */
+  for (i = 0; i < 10; i++) {
+    seqnum += i + 1;
+    push_test_buffer_now (h, seqnum, rtptime);
+  }
+
+  /* and then normal buffers again */
+  for (i = 0; i < 20; i++) {
+    seqnum += 1;
+    rtptime += TEST_BUF_DURATION;
+    push_test_buffer_now (h, seqnum, rtptime);
+    g_usleep (G_USEC_PER_SEC / 1000 * 20);
+  }
+
+  /* we expect all those 20 sequential buffers to come through */
+  fail_unless (gst_harness_buffers_in_queue (h) >= 20);
+
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+
 static Suite *
 rtpjitterbuffer_suite (void)
 {
@@ -3418,6 +3478,8 @@ rtpjitterbuffer_suite (void)
   tcase_add_test (tc_chain, test_drop_messages_drop_on_latency);
   tcase_add_test (tc_chain, test_drop_messages_interval);
 
+  tcase_add_test (tc_chain, test_reset_does_not_stall);
+
   return s;
 }