rtpjitterbuffer: immediately insert a lost-event on multiple lost packets
authorHavard Graff <havard.graff@gmail.com>
Thu, 19 Mar 2020 22:12:04 +0000 (23:12 +0100)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Fri, 20 Mar 2020 13:17:20 +0000 (13:17 +0000)
There is a problem with the code today, where a single timer will
be scheduled for a series of lost packets, and then if the first packet
in that series arrives, it will cause a rescheduling of that timer, going
from a "multi"-timer to a single-timer, causing a lot of the packets
in that timer to be unaccounted for, and creating a situation in where
the jitterbuffer will never again push out another packet.

This patch solves the problem by instead of scheduling those lost packets
as another timer, it instead asks to have that lost-event pushed straight
out.

This very much goes with the intent of the code here: These packets are
so desperately late that no cure exists, and we might as well get the
lost-event out of the way and get on with it.

This change has some interesting knock-on effect being presented in
later commits. It completely removes the concept of "already-lost", so
that is why that test has been disabled in this commit, to be
removed later.

gst/rtpmanager/gstrtpjitterbuffer.c
tests/check/elements/rtpjitterbuffer.c

index 6fee3de..8c2a094 100644 (file)
@@ -2479,10 +2479,11 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
   GstClockTime duration, expected_pts;
   gboolean equidistant = priv->equidistant > 0;
+  GstClockTime last_in_pts = priv->last_in_pts;
 
   GST_DEBUG_OBJECT (jitterbuffer,
       "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
+      GST_TIME_ARGS (pts), GST_TIME_ARGS (last_in_pts));
 
   if (pts == GST_CLOCK_TIME_NONE) {
     GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
@@ -2492,8 +2493,8 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
   if (equidistant) {
     GstClockTime total_duration;
     /* the total duration spanned by the missing packets */
-    if (pts >= priv->last_in_pts)
-      total_duration = pts - priv->last_in_pts;
+    if (pts >= last_in_pts)
+      total_duration = pts - last_in_pts;
     else
       total_duration = 0;
 
@@ -2530,18 +2531,30 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
           GST_TIME_ARGS (priv->latency_ns), lost_packets,
           GST_TIME_ARGS (gap_time));
 
-      /* this timer will fire immediately and the lost event will be pushed from
-       * the timer thread */
+      /* this multi-lost-packet event will be inserted directly into the packet-queue
+         for immediate processing */
       if (lost_packets > 0) {
-        rtp_timer_queue_set_lost (priv->timers, expected, lost_packets,
-            priv->last_in_pts + duration, gap_time,
-            timeout_offset (jitterbuffer));
+        RtpTimer *timer;
+        GstClockTime timestamp =
+            apply_offset (jitterbuffer, last_in_pts + duration);
+        insert_lost_event (jitterbuffer, expected, lost_packets, timestamp,
+            gap_time, 0);
+
+        timer = rtp_timer_queue_find (priv->timers, expected);
+        if (timer && timer->type == RTP_TIMER_EXPECTED) {
+          if (timer->queued)
+            rtp_timer_queue_unschedule (priv->timers, timer);
+          GST_DEBUG_OBJECT (jitterbuffer, "removing timer for seqnum #%u",
+              expected);
+          rtp_timer_free (timer);
+        }
+
         expected += lost_packets;
-        priv->last_in_pts += gap_time;
+        last_in_pts += gap_time;
       }
     }
 
-    expected_pts = priv->last_in_pts + duration;
+    expected_pts = last_in_pts + duration;
   } else {
     /* If we cannot assume equidistant packet spacing, the only thing we now
      * for sure is that the missing packets have expected pts not later than
index a8ce9c3..fd4d271 100644 (file)
@@ -3096,6 +3096,71 @@ GST_START_TEST (test_reset_does_not_stall)
 
 GST_END_TEST;
 
+typedef struct
+{
+  guint16 seqnum;
+  guint32 rtptime;
+  gint sleep_ms;
+} PushBufferCtx;
+
+GST_START_TEST (test_multiple_lost_do_not_stall)
+{
+  GstHarness *h = gst_harness_new ("rtpjitterbuffer");
+  gint latency_ms = 200;
+  guint inital_bufs = latency_ms / TEST_BUF_MS;
+  guint max_dropout_time = 10;
+  guint16 i;
+  guint16 seqnum = 1000;
+  guint32 rtptime = seqnum * TEST_RTP_TS_DURATION;
+  guint in_queue;
+  PushBufferCtx bufs[] = {
+    {1039, 166560, 58},
+    {1011, 161280, 1000},
+  };
+  gint size = G_N_ELEMENTS (bufs);
+
+  gst_harness_use_systemclock (h);
+  gst_harness_set_src_caps (h, generate_caps ());
+
+  g_object_set (h->element, "latency", latency_ms, "do-retransmission", TRUE,
+      "do-lost", TRUE, "rtx-max-retries", 2,
+      "max-dropout-time", max_dropout_time, NULL);
+
+  /* push initial buffers and pull them out as well */
+  for (i = 0; i < inital_bufs; i++) {
+    seqnum += 1;
+    rtptime += TEST_RTP_TS_DURATION;
+    push_test_buffer_now (h, seqnum, rtptime);
+    g_usleep (G_USEC_PER_SEC / 1000 * 20);
+  }
+  for (i = 0; i < inital_bufs; i++) {
+    gst_buffer_unref (gst_harness_pull (h));
+  }
+
+  /* push buffers according to list */
+  for (i = 0; i < size; i++) {
+    push_test_buffer_now (h, bufs[i].seqnum, bufs[i].rtptime);
+    g_usleep (G_USEC_PER_SEC / 1000 * bufs[i].sleep_ms);
+    seqnum = MAX (bufs[i].seqnum, seqnum);
+  }
+
+  in_queue = gst_harness_buffers_in_queue (h);
+
+  /* and then normal buffers again */
+  for (i = 0; i < 5; i++) {
+    seqnum += 1;
+    push_test_buffer_now (h, seqnum, seqnum * TEST_RTP_TS_DURATION);
+    g_usleep (G_USEC_PER_SEC / 1000 * 20);
+  }
+
+  /* we expect at least some of those buffers to come through */
+  fail_unless (gst_harness_buffers_in_queue (h) != in_queue);
+
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
 
 static Suite *
 rtpjitterbuffer_suite (void)
@@ -3161,11 +3226,12 @@ rtpjitterbuffer_suite (void)
   tcase_add_test (tc_chain, test_performance);
 
   tcase_add_test (tc_chain, test_drop_messages_too_late);
-  tcase_add_test (tc_chain, test_drop_messages_already_lost);
+  tcase_skip_broken_test (tc_chain, test_drop_messages_already_lost);
   tcase_add_test (tc_chain, test_drop_messages_drop_on_latency);
   tcase_add_test (tc_chain, test_drop_messages_interval);
 
   tcase_add_test (tc_chain, test_reset_does_not_stall);
+  tcase_add_test (tc_chain, test_multiple_lost_do_not_stall);
 
   return s;
 }