rtpjitterbuffer: When detecting a huge seqnum gap, wait for 5 consecutive packets...
authorSebastian Dröge <sebastian@centricular.com>
Wed, 22 Apr 2015 16:54:06 +0000 (18:54 +0200)
committerSebastian Dröge <sebastian@centricular.com>
Mon, 18 May 2015 15:43:15 +0000 (18:43 +0300)
It might just be a late retransmission or spurious packet from elsewhere, but
resetting everything would mean that we will cause a noticeable hickup. Let's
get some confidence first that the sequence numbers changed for whatever
reason.

https://bugzilla.gnome.org/show_bug.cgi?id=747922

gst/rtpmanager/gstrtpjitterbuffer.c

index 547bef5..dd99f6b 100644 (file)
@@ -269,6 +269,8 @@ struct _GstRtpJitterBufferPrivate
   guint64 ips_rtptime;
   GstClockTime packet_spacing;
 
+  GQueue gap_packets;
+
   /* the next expected seqnum we receive */
   GstClockTime last_in_dts;
   guint32 last_in_seqnum;
@@ -831,6 +833,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
   g_cond_init (&priv->jbuf_timer);
   g_cond_init (&priv->jbuf_event);
   g_cond_init (&priv->jbuf_query);
+  g_queue_init (&priv->gap_packets);
 
   /* reset skew detection initialy */
   rtp_jitter_buffer_reset_skew (priv->jbuf);
@@ -931,6 +934,8 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
   g_cond_clear (&priv->jbuf_query);
 
   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
+  g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+  g_queue_clear (&priv->gap_packets);
   g_object_unref (priv->jbuf);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
@@ -1314,6 +1319,8 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
   rtp_jitter_buffer_reset_skew (priv->jbuf);
   remove_all_timers (jitterbuffer);
+  g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+  g_queue_clear (&priv->gap_packets);
   JBUF_UNLOCK (priv);
 }
 
@@ -2223,6 +2230,97 @@ no_time:
   }
 }
 
+static gint
+compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
+{
+  GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
+  GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
+  guint seq_a, seq_b;
+
+  gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
+  seq_a = gst_rtp_buffer_get_seq (&rtp_a);
+  gst_rtp_buffer_unmap (&rtp_a);
+
+  gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
+  seq_b = gst_rtp_buffer_get_seq (&rtp_b);
+  gst_rtp_buffer_unmap (&rtp_b);
+
+  return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
+}
+
+static gboolean
+handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future,
+    GstBuffer * buffer, guint8 pt, guint16 seqnum, gint gap)
+{
+  GstRtpJitterBufferPrivate *priv;
+  guint gap_packets_length;
+  gboolean reset = FALSE;
+
+  priv = jitterbuffer->priv;
+
+  if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
+    GList *l;
+    guint32 prev_gap_seq = -1;
+    gboolean all_consecutive = TRUE;
+
+    g_queue_insert_sorted (&priv->gap_packets, buffer,
+        (GCompareDataFunc) compare_buffer_seqnum, NULL);
+
+    for (l = priv->gap_packets.head; l; l = l->next) {
+      GstBuffer *gap_buffer = l->data;
+      GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
+      guint32 gap_seq;
+
+      gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
+
+      all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
+
+      gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
+      if (prev_gap_seq == -1)
+        prev_gap_seq = gap_seq;
+      else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
+        all_consecutive = FALSE;
+      else
+        prev_gap_seq = gap_seq;
+
+      gst_rtp_buffer_unmap (&gap_rtp);
+      if (!all_consecutive)
+        break;
+    }
+
+    if (all_consecutive && gap_packets_length > 3) {
+      GST_DEBUG_OBJECT (jitterbuffer,
+          "buffer too %s %d < %d, got 5 consecutive ones - reset",
+          (future ? "new" : "old"), gap,
+          (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER));
+      reset = TRUE;
+    } else if (!all_consecutive) {
+      g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+      g_queue_clear (&priv->gap_packets);
+      GST_DEBUG_OBJECT (jitterbuffer,
+          "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
+          (future ? "new" : "old"), gap,
+          (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER));
+      buffer = NULL;
+    } else {
+      GST_DEBUG_OBJECT (jitterbuffer,
+          "buffer too %s %d < %d, got %u consecutive ones - waiting",
+          (future ? "new" : "old"), gap,
+          (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER),
+          gap_packets_length + 1);
+      buffer = NULL;
+    }
+  } else {
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
+        gap, -RTP_MAX_MISORDER);
+    g_queue_push_tail (&priv->gap_packets, buffer);
+    buffer = NULL;
+  }
+
+  return reset;
+}
+
 static GstFlowReturn
 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
     GstBuffer * buffer)
@@ -2355,21 +2453,21 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
         goto gap_but_no_dts;
       } else if (gap < 0) {
         /* we received an old packet */
-        if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
-          /* too old packet, reset */
-          GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
-              -RTP_MAX_MISORDER);
-          reset = TRUE;
+        if (G_UNLIKELY (gap != -1 && gap < -RTP_MAX_MISORDER)) {
+          reset =
+              handle_big_gap_buffer (jitterbuffer, FALSE, buffer, pt, seqnum,
+              gap);
+          buffer = NULL;
         } else {
           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
         }
       } else {
         /* new packet, we are missing some packets */
-        if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
-          /* packet too far in future, reset */
-          GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
-              RTP_MAX_DROPOUT);
-          reset = TRUE;
+        if (G_UNLIKELY (gap >= RTP_MAX_DROPOUT)) {
+          reset =
+              handle_big_gap_buffer (jitterbuffer, TRUE, buffer, pt, seqnum,
+              gap);
+          buffer = NULL;
         } else {
           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
           /* fill in the gap with EXPECTED timers */
@@ -2380,6 +2478,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
       }
       if (G_UNLIKELY (reset)) {
         GList *events = NULL, *l;
+        GList *buffers;
 
         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
         rtp_jitter_buffer_flush (priv->jbuf,
@@ -2389,7 +2488,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
         priv->discont = TRUE;
         priv->last_popped_seqnum = -1;
         priv->next_seqnum = seqnum;
-        do_next_seqnum = TRUE;
+
+        priv->last_in_seqnum = -1;
+        priv->last_in_dts = -1;
+        priv->next_in_seqnum = -1;
 
         /* Insert all sticky events again in order, otherwise we would
          * potentially loose STREAM_START, CAPS or SEGMENT events
@@ -2404,6 +2506,29 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
         g_list_free (events);
 
         JBUF_SIGNAL_EVENT (priv);
+
+        /* reset spacing estimation when gap */
+        priv->ips_rtptime = -1;
+        priv->ips_dts = GST_CLOCK_TIME_NONE;
+
+        buffers = g_list_copy (priv->gap_packets.head);
+        g_queue_clear (&priv->gap_packets);
+
+        priv->ips_rtptime = -1;
+        priv->ips_dts = GST_CLOCK_TIME_NONE;
+        JBUF_UNLOCK (jitterbuffer->priv);
+
+        for (l = buffers; l; l = l->next) {
+          ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
+          l->data = NULL;
+          if (ret != GST_FLOW_OK)
+            break;
+        }
+        for (; l; l = l->next)
+          gst_buffer_unref (l->data);
+        g_list_free (buffers);
+
+        return ret;
       }
       /* reset spacing estimation when gap */
       priv->ips_rtptime = -1;
@@ -2420,6 +2545,19 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
     priv->ips_rtptime = rtptime;
     priv->ips_dts = dts;
   }
+
+  /* We had no huge gap, let's drop all the gap packets */
+  if (buffer != NULL) {
+    GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
+    g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+    g_queue_clear (&priv->gap_packets);
+  } else {
+    GST_DEBUG_OBJECT (jitterbuffer,
+        "Had big gap, waiting for more consecutive packets");
+    JBUF_UNLOCK (jitterbuffer->priv);
+    return GST_FLOW_OK;
+  }
+
   if (do_next_seqnum) {
     priv->last_in_seqnum = seqnum;
     priv->last_in_dts = dts;
@@ -2747,6 +2885,14 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
       break;
     case ITEM_TYPE_LOST:
     case ITEM_TYPE_EVENT:
+      /* We got not enough consecutive packets with a huge gap, we can
+       * as well just drop them here now on EOS */
+      if (GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
+        GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
+        g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+        g_queue_clear (&priv->gap_packets);
+      }
+
       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);