rtpjitterbuffer: max-dropout-time gets cast to int32
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
index 2fbbd4b..e8b636f 100644 (file)
@@ -47,7 +47,7 @@
  * depayloader or other element to create concealment data or some other logic
  * to gracefully handle the missing packets.
  *
- * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
+ * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming
  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
  * buffer.
  *
@@ -69,7 +69,7 @@
  *
  * - If seqnum N arrived, all seqnum older than
  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
- *     immediately. This is to request fast feedback for abonormally reorder
+ *     immediately. This is to request fast feedback for abnormally reorder
  *     packets before any of the previous timeouts is triggered.
  *
  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
@@ -200,6 +200,20 @@ enum
     (g_mutex_unlock (&(priv)->jbuf_lock));                     \
 } G_STMT_END
 
+#define JBUF_WAIT_QUEUE(priv)   G_STMT_START {            \
+  GST_DEBUG ("waiting queue");                            \
+  (priv)->waiting_queue++;                                \
+  g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock);  \
+  (priv)->waiting_queue--;                                \
+  GST_DEBUG ("waiting queue done");                       \
+} G_STMT_END
+#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START {            \
+  if (G_UNLIKELY ((priv)->waiting_queue)) {               \
+    GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
+    g_cond_signal (&(priv)->jbuf_queue);                  \
+  }                                                       \
+} G_STMT_END
+
 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
   GST_DEBUG ("waiting timer");                            \
   (priv)->waiting_timer++;                                \
@@ -263,6 +277,8 @@ struct _GstRtpJitterBufferPrivate
 
   RTPJitterBuffer *jbuf;
   GMutex jbuf_lock;
+  gboolean waiting_queue;
+  GCond jbuf_queue;
   gboolean waiting_timer;
   GCond jbuf_timer;
   gboolean waiting_event;
@@ -274,6 +290,7 @@ struct _GstRtpJitterBufferPrivate
   gboolean ts_discont;
   gboolean active;
   guint64 out_offset;
+  guint32 segment_seqnum;
 
   gboolean timer_running;
   GThread *timer_thread;
@@ -668,7 +685,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
           "Sending retransmission event when this much reordering "
-          "(0 disable, -1 automatic)",
+          "(0 disable)",
           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   /**
@@ -767,7 +784,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
       g_param_spec_uint ("max-dropout-time", "Max dropout time",
           "The maximum time (milliseconds) of missing packets tolerated.",
-          0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
+          0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
@@ -982,6 +999,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
 
   GST_DEBUG_CATEGORY_INIT
       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+  GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp);
 }
 
 static void
@@ -1019,10 +1037,12 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
   priv->last_pts = -1;
   priv->last_rtptime = -1;
   priv->avg_jitter = 0;
+  priv->segment_seqnum = GST_SEQNUM_INVALID;
   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
   priv->rtx_stats_timers = timer_queue_new ();
   priv->jbuf = rtp_jitter_buffer_new ();
   g_mutex_init (&priv->jbuf_lock);
+  g_cond_init (&priv->jbuf_queue);
   g_cond_init (&priv->jbuf_timer);
   g_cond_init (&priv->jbuf_event);
   g_cond_init (&priv->jbuf_query);
@@ -1128,6 +1148,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
   g_array_free (priv->timers, TRUE);
   timer_queue_free (priv->rtx_stats_timers);
   g_mutex_clear (&priv->jbuf_lock);
+  g_cond_clear (&priv->jbuf_queue);
   g_cond_clear (&priv->jbuf_timer);
   g_cond_clear (&priv->jbuf_event);
   g_cond_clear (&priv->jbuf_query);
@@ -1576,6 +1597,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
   /* this unblocks any waiting pops on the src pad task */
   JBUF_SIGNAL_EVENT (priv);
   JBUF_SIGNAL_QUERY (priv, FALSE);
+  JBUF_SIGNAL_QUEUE (priv);
   JBUF_UNLOCK (priv);
 }
 
@@ -1610,6 +1632,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
   priv->last_rtptime = -1;
   priv->last_in_pts = 0;
   priv->equidistant = 0;
+  priv->segment_seqnum = GST_SEQNUM_INVALID;
   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
@@ -1680,6 +1703,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       /* block until we go to PLAYING */
       priv->blocked = TRUE;
       priv->timer_running = TRUE;
+      priv->srcresult = GST_FLOW_OK;
       priv->timer_thread =
           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
       JBUF_UNLOCK (priv);
@@ -1718,9 +1742,11 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       JBUF_LOCK (priv);
       gst_buffer_replace (&priv->last_sr, NULL);
       priv->timer_running = FALSE;
+      priv->srcresult = GST_FLOW_FLUSHING;
       unschedule_current_timer (jitterbuffer);
       JBUF_SIGNAL_TIMER (priv);
       JBUF_SIGNAL_QUERY (priv, FALSE);
+      JBUF_SIGNAL_QUEUE (priv);
       JBUF_UNLOCK (priv);
       g_thread_join (priv->timer_thread);
       priv->timer_thread = NULL;
@@ -1801,6 +1827,8 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
       GstSegment segment;
       gst_event_copy_segment (event, &segment);
 
+      priv->segment_seqnum = gst_event_get_seqnum (event);
+
       /* we need time for now */
       if (segment.format != GST_FORMAT_TIME) {
         GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
@@ -1808,6 +1836,7 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
 
         gst_segment_init (&segment, GST_FORMAT_TIME);
         event = gst_event_new_segment (&segment);
+        gst_event_set_seqnum (event, priv->segment_seqnum);
       }
 
       priv->segment = segment;
@@ -3125,6 +3154,17 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
       timer->num_rtx_received++;
   }
 
+  /* At 2^15, we would detect a seqnum rollover too early, therefore
+   * limit the queue size. But let's not limit it to a number that is
+   * too small to avoid emptying it needlessly if there is a spurious huge
+   * sequence number, let's allow at least 10k packets in any case. */
+  while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 &&
+      rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 &&
+      priv->srcresult == GST_FLOW_OK)
+    JBUF_WAIT_QUEUE (priv);
+  if (priv->srcresult != GST_FLOW_OK)
+    goto out_flushing;
+
   /* let's check if this buffer is too late, we can only accept packets with
    * bigger seqnum than the one we last pushed. */
   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
@@ -3940,8 +3980,13 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
   remove_timer (jitterbuffer, timer);
   if (!priv->eos) {
+    GstEvent *event;
+
     /* there was no EOS in the buffer, put one in there now */
-    queue_event (jitterbuffer, gst_event_new_eos ());
+    event = gst_event_new_eos ();
+    if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+      gst_event_set_seqnum (event, priv->segment_seqnum);
+    queue_event (jitterbuffer, event);
   }
   JBUF_SIGNAL_EVENT (priv);
 
@@ -4179,6 +4224,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
   JBUF_LOCK_CHECK (priv, flushing);
   do {
     result = handle_next_buffer (jitterbuffer);
+    JBUF_SIGNAL_QUEUE (priv);
     if (G_LIKELY (result == GST_FLOW_WAIT)) {
       /* now wait for the next event */
       JBUF_WAIT_EVENT (priv, flushing);
@@ -4208,6 +4254,8 @@ pause:
     gst_pad_pause_task (priv->srcpad);
     if (result == GST_FLOW_EOS) {
       event = gst_event_new_eos ();
+      if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+        gst_event_set_seqnum (event, priv->segment_seqnum);
       gst_pad_push_event (priv->srcpad, event);
     }
     return;