rtpjitterbuffer: Limit size to 2^15 packets
authorOlivier Crête <olivier.crete@collabora.com>
Fri, 11 Jan 2019 22:53:43 +0000 (17:53 -0500)
committerOlivier Crête <olivier.crete@ocrete.ca>
Mon, 11 Feb 2019 23:41:14 +0000 (23:41 +0000)
If it goes over 2^15 packets, it will think it has rolled over
and start dropping all packets. So make sure the seqnum distance is not too big.

But let's not limit it to a number that is too small to avoid emptying it
needlessly if there is a spurious huge sequence number, let's allow at
least 10k packets in any case.

gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.h

index 9909af3..5ec9eee 100644 (file)
@@ -200,6 +200,20 @@ enum
     (g_mutex_unlock (&(priv)->jbuf_lock));                     \
 } G_STMT_END
 
+#define JBUF_WAIT_QUEUE(priv)   G_STMT_START {            \
+  GST_DEBUG ("waiting queue");                            \
+  (priv)->waiting_queue++;                                \
+  g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock);  \
+  (priv)->waiting_queue--;                                \
+  GST_DEBUG ("waiting queue done");                       \
+} G_STMT_END
+#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START {            \
+  if (G_UNLIKELY ((priv)->waiting_queue)) {               \
+    GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
+    g_cond_signal (&(priv)->jbuf_queue);                  \
+  }                                                       \
+} G_STMT_END
+
 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
   GST_DEBUG ("waiting timer");                            \
   (priv)->waiting_timer++;                                \
@@ -263,6 +277,8 @@ struct _GstRtpJitterBufferPrivate
 
   RTPJitterBuffer *jbuf;
   GMutex jbuf_lock;
+  gboolean waiting_queue;
+  GCond jbuf_queue;
   gboolean waiting_timer;
   GCond jbuf_timer;
   gboolean waiting_event;
@@ -1025,6 +1041,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
   priv->rtx_stats_timers = timer_queue_new ();
   priv->jbuf = rtp_jitter_buffer_new ();
   g_mutex_init (&priv->jbuf_lock);
+  g_cond_init (&priv->jbuf_queue);
   g_cond_init (&priv->jbuf_timer);
   g_cond_init (&priv->jbuf_event);
   g_cond_init (&priv->jbuf_query);
@@ -1130,6 +1147,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
   g_array_free (priv->timers, TRUE);
   timer_queue_free (priv->rtx_stats_timers);
   g_mutex_clear (&priv->jbuf_lock);
+  g_cond_clear (&priv->jbuf_queue);
   g_cond_clear (&priv->jbuf_timer);
   g_cond_clear (&priv->jbuf_event);
   g_cond_clear (&priv->jbuf_query);
@@ -1578,6 +1596,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
   /* this unblocks any waiting pops on the src pad task */
   JBUF_SIGNAL_EVENT (priv);
   JBUF_SIGNAL_QUERY (priv, FALSE);
+  JBUF_SIGNAL_QUEUE (priv);
   JBUF_UNLOCK (priv);
 }
 
@@ -1683,6 +1702,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       /* block until we go to PLAYING */
       priv->blocked = TRUE;
       priv->timer_running = TRUE;
+      priv->srcresult = GST_FLOW_OK;
       priv->timer_thread =
           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
       JBUF_UNLOCK (priv);
@@ -1721,9 +1741,11 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       JBUF_LOCK (priv);
       gst_buffer_replace (&priv->last_sr, NULL);
       priv->timer_running = FALSE;
+      priv->srcresult = GST_FLOW_FLUSHING;
       unschedule_current_timer (jitterbuffer);
       JBUF_SIGNAL_TIMER (priv);
       JBUF_SIGNAL_QUERY (priv, FALSE);
+      JBUF_SIGNAL_QUEUE (priv);
       JBUF_UNLOCK (priv);
       g_thread_join (priv->timer_thread);
       priv->timer_thread = NULL;
@@ -3131,6 +3153,17 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
       timer->num_rtx_received++;
   }
 
+  /* At 2^15, we would detect a seqnum rollover too early, therefore
+   * limit the queue size. But let's not limit it to a number that is
+   * too small to avoid emptying it needlessly if there is a spurious huge
+   * sequence number, let's allow at least 10k packets in any case. */
+  while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 &&
+      rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 &&
+      priv->srcresult == GST_FLOW_OK)
+    JBUF_WAIT_QUEUE (priv);
+  if (priv->srcresult != GST_FLOW_OK)
+    goto out_flushing;
+
   /* let's check if this buffer is too late, we can only accept packets with
    * bigger seqnum than the one we last pushed. */
   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
@@ -4190,6 +4223,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
   JBUF_LOCK_CHECK (priv, flushing);
   do {
     result = handle_next_buffer (jitterbuffer);
+    JBUF_SIGNAL_QUEUE (priv);
     if (G_LIKELY (result == GST_FLOW_WAIT)) {
       /* now wait for the next event */
       JBUF_WAIT_EVENT (priv, flushing);
index f1187e2..0308a53 100644 (file)
@@ -1227,6 +1227,49 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
   return result;
 }
 
+
+/**
+ * rtp_jitter_buffer_get_seqnum_diff:
+ * @jbuf: an #RTPJitterBuffer
+ *
+ * Get the difference between the seqnum of first and last packet in the
+ * jitterbuffer.
+ *
+ * Returns: The difference expressed in seqnum.
+ */
+guint16
+rtp_jitter_buffer_get_seqnum_diff (RTPJitterBuffer * jbuf)
+{
+  guint32 high_seqnum, low_seqnum;
+  RTPJitterBufferItem *high_buf, *low_buf;
+  guint16 result;
+
+  g_return_val_if_fail (jbuf != NULL, 0);
+
+  high_buf = (RTPJitterBufferItem *) g_queue_peek_tail_link (jbuf->packets);
+  low_buf = (RTPJitterBufferItem *) g_queue_peek_head_link (jbuf->packets);
+
+  while (high_buf && high_buf->seqnum == -1)
+    high_buf = (RTPJitterBufferItem *) high_buf->prev;
+
+  while (low_buf && low_buf->seqnum == -1)
+    low_buf = (RTPJitterBufferItem *) low_buf->next;
+
+  if (!high_buf || !low_buf || high_buf == low_buf)
+    return 0;
+
+  high_seqnum = high_buf->seqnum;
+  low_seqnum = low_buf->seqnum;
+
+  /* it needs to work if ts wraps */
+  if (high_seqnum >= low_seqnum) {
+    result = (guint32) (high_seqnum - low_seqnum);
+  } else {
+    result = (guint32) (high_seqnum + G_MAXUINT16 + 1 - low_seqnum);
+  }
+  return result;
+}
+
 /**
  * rtp_jitter_buffer_get_sync:
  * @jbuf: an #RTPJitterBuffer
index 16db644..b65b603 100644 (file)
@@ -184,6 +184,7 @@ gint                  rtp_jitter_buffer_get_percent      (RTPJitterBuffer * jbuf
 
 guint                 rtp_jitter_buffer_num_packets      (RTPJitterBuffer *jbuf);
 guint32               rtp_jitter_buffer_get_ts_diff      (RTPJitterBuffer *jbuf);
+guint16               rtp_jitter_buffer_get_seqnum_diff  (RTPJitterBuffer * jbuf);
 
 void                  rtp_jitter_buffer_get_sync         (RTPJitterBuffer *jbuf, guint64 *rtptime,
                                                           guint64 *timestamp, guint32 *clock_rate,