rtpjittterbuffer: Port timers array to RtpTimerQueue
authorNicolas Dufresne <nicolas.dufresne@collabora.com>
Tue, 18 Jun 2019 23:07:29 +0000 (19:07 -0400)
committerOlivier CrĂȘte <olivier.crete@collabora.com>
Fri, 27 Sep 2019 21:34:04 +0000 (17:34 -0400)
In this patch we now make use of the new RtpTimerQueue instead of the
old GArray. This required a lot of changes all over the place, some of
the important changes are that `timer->timeout` is no longer a PTS but
the actual timeout. This was required to get the RtpTimerQueue sorting
right. The applied offset is saved as `timer->offset`, this allow
retreiving back the PTS when needed.

The clockid updates only happens once per incoming packet. If the
currently schedule timer is before the earliest timer in the queue, we
no longer wakeup the thread. This way, if other timers get setup in the
meantime, this will reduce the number of wakup.

The timer loop code has been mostly rewritten, though the behaviour of
running the lost timers first has been kept (even though there is no
test to show what would be the side effect of doing this differently).

Fixes #608

gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/rtptimerqueue.h

index ecb5081..1ed9118 100644 (file)
@@ -333,7 +333,9 @@ struct _GstRtpJitterBufferPrivate
   GstClockTime last_in_pts;
   guint32 next_in_seqnum;
 
-  GArray *timers;
+  /* "normal" timers */
+  RtpTimerQueue *timers;
+  /* timers used for RTX statistics backlog */
   RtpTimerQueue *rtx_stats_timers;
 
   /* start and stop ranges */
@@ -479,7 +481,6 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
 
 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
-static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
 
 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
 
@@ -956,7 +957,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
   priv->last_rtptime = -1;
   priv->avg_jitter = 0;
   priv->segment_seqnum = GST_SEQNUM_INVALID;
-  priv->timers = g_array_new (FALSE, TRUE, sizeof (RtpTimer));
+  priv->timers = rtp_timer_queue_new ();
   priv->rtx_stats_timers = rtp_timer_queue_new ();
   priv->jbuf = rtp_jitter_buffer_new ();
   g_mutex_init (&priv->jbuf_lock);
@@ -1041,7 +1042,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
   priv = jitterbuffer->priv;
 
-  g_array_free (priv->timers, TRUE);
+  g_object_unref (priv->timers);
   g_object_unref (priv->rtx_stats_timers);
   g_mutex_clear (&priv->jbuf_lock);
   g_cond_clear (&priv->jbuf_queue);
@@ -1533,7 +1534,7 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
   rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
   rtp_jitter_buffer_reset_skew (priv->jbuf);
-  remove_all_timers (jitterbuffer);
+  rtp_timer_queue_remove_all (priv->timers);
   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
   g_queue_clear (&priv->gap_packets);
   JBUF_UNLOCK (priv);
@@ -1980,22 +1981,11 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
   return timestamp;
 }
 
-static RtpTimer *
-find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
+static GstClockTimeDiff
+timeout_offset (GstRtpJitterBuffer * jitterbuffer)
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  RtpTimer *timer = NULL;
-  gint i, len;
-
-  len = priv->timers->len;
-  for (i = 0; i < len; i++) {
-    RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
-    if (test->seqnum == seqnum) {
-      timer = test;
-      break;
-    }
-  }
-  return timer;
+  return priv->ts_offset + priv->out_offset + priv->latency_ns;
 }
 
 static void
@@ -2011,172 +2001,43 @@ unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
 }
 
 static GstClockTime
-get_timeout (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer)
+get_pts_timeout (const RtpTimer * timer)
 {
-  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  GstClockTime test_timeout;
-
-  if ((test_timeout = timer->timeout) == -1)
+  if (timer->timeout == -1)
     return -1;
 
-  if (timer->type != RTP_TIMER_EXPECTED) {
-    /* add our latency and offset to get output times. */
-    test_timeout = apply_offset (jitterbuffer, test_timeout);
-    test_timeout += priv->latency_ns;
-  }
-  return test_timeout;
+  return timer->timeout - timer->offset;
 }
 
 static void
-recalculate_timer (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer)
-{
-  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-
-  if (priv->clock_id) {
-    GstClockTime timeout = get_timeout (jitterbuffer, timer);
-
-    GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
-
-    if (timeout == -1 || timeout < priv->timer_timeout)
-      unschedule_current_timer (jitterbuffer);
-  }
-}
-
-static RtpTimer *
-add_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimerType type,
-    guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
-    GstClockTime duration)
+update_current_timer (GstRtpJitterBuffer * jitterbuffer)
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
   RtpTimer *timer;
-  gint len;
-
-  GST_DEBUG_OBJECT (jitterbuffer,
-      "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
-      GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
-      GST_TIME_ARGS (delay));
-
-  len = priv->timers->len;
-  g_array_set_size (priv->timers, len + 1);
-  timer = &g_array_index (priv->timers, RtpTimer, len);
-  timer->idx = len;
-  timer->type = type;
-  timer->seqnum = seqnum;
-  timer->num = num;
-  timer->timeout = timeout + delay;
-  timer->duration = duration;
-  if (type == RTP_TIMER_EXPECTED) {
-    timer->rtx_base = timeout;
-    timer->rtx_delay = delay;
-    timer->rtx_retry = 0;
-  }
-  timer->rtx_last = GST_CLOCK_TIME_NONE;
-  timer->num_rtx_retry = 0;
-  timer->num_rtx_received = 0;
-  recalculate_timer (jitterbuffer, timer);
-  JBUF_SIGNAL_TIMER (priv);
-
-  return timer;
-}
-
-static void
-reschedule_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
-    guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
-{
-  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  gboolean seqchange, timechange;
-  guint16 oldseq;
-  GstClockTime new_timeout;
 
-  oldseq = timer->seqnum;
-  new_timeout = timeout + delay;
-  seqchange = oldseq != seqnum;
-  timechange = timer->timeout != new_timeout;
+  timer = rtp_timer_queue_peek_earliest (priv->timers);
 
-  if (!seqchange && !timechange) {
-    GST_DEBUG_OBJECT (jitterbuffer,
-        "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
-        "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
+  /* we never need to wakeup the timer thread when there is no more timers, if
+   * it was waiting on a clock id, it will simply do later and then wait on
+   * the conditions */
+  if (timer == NULL) {
+    GST_DEBUG_OBJECT (jitterbuffer, "no more timers");
     return;
   }
 
-  GST_DEBUG_OBJECT (jitterbuffer,
-      "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
-      "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
-      GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
-
-  timer->timeout = new_timeout;
-  timer->seqnum = seqnum;
-  if (reset) {
-    GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
-        "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
-        GST_TIME_ARGS (delay));
-    timer->rtx_base = timeout;
-    timer->rtx_delay = delay;
-    timer->rtx_retry = 0;
-  }
-  if (seqchange) {
-    timer->num_rtx_retry = 0;
-    timer->num_rtx_received = 0;
-  }
-
-  if (priv->clock_id) {
-    /* we changed the seqnum and there is a timer currently waiting with this
-     * seqnum, unschedule it */
-    if (seqchange && priv->timer_seqnum == oldseq)
-      unschedule_current_timer (jitterbuffer);
-    /* we changed the time, check if it is earlier than what we are waiting
-     * for and unschedule if so */
-    else if (timechange)
-      recalculate_timer (jitterbuffer, timer);
-  }
-}
+  GST_DEBUG_OBJECT (jitterbuffer, "waiting till %" GST_TIME_FORMAT
+      " and earliest timeout is at %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (priv->timer_timeout), GST_TIME_ARGS (timer->timeout));
 
-static RtpTimer *
-set_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimerType type,
-    guint16 seqnum, GstClockTime timeout)
-{
-  RtpTimer *timer;
-
-  /* find the seqnum timer */
-  timer = find_timer (jitterbuffer, seqnum);
-  if (timer == NULL) {
-    timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
-  } else {
-    reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
-  }
-  return timer;
-}
-
-static void
-remove_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer)
-{
-  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  guint idx;
+  /* wakeup the timer thread in case the timer queue was empty */
+  JBUF_SIGNAL_TIMER (priv);
 
-  if (timer->idx == -1)
+  /* no need to wait if the current wait is earlier or later */
+  if (timer->timeout != -1 && timer->timeout >= priv->timer_timeout)
     return;
 
-  if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
-    unschedule_current_timer (jitterbuffer);
-
-  idx = timer->idx;
-  GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
-  g_array_remove_index_fast (priv->timers, idx);
-  timer->idx = idx;
-
-  JBUF_SIGNAL_TIMER (priv);
-}
-
-static void
-remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
-{
-  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
-  g_array_set_size (priv->timers, 0);
+  /* for other cases, force a reschedule of the timer thread */
   unschedule_current_timer (jitterbuffer);
-  JBUF_SIGNAL_TIMER (priv);
 }
 
 /* get the extra delay to wait before sending RTX */
@@ -2216,14 +2077,13 @@ get_rtx_delay (GstRtpJitterBufferPrivate * priv)
 /* Check if packet with seqnum is already considered definitely lost by being
  * part of a "lost timer" for multiple packets */
 static gboolean
-already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
+already_lost (GstRtpJitterBuffer * jitterbuffer, GstClockTime pts,
+    guint16 seqnum)
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  gint i, len;
 
-  len = priv->timers->len;
-  for (i = 0; i < len; i++) {
-    RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
+  RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
+  while (test && test->timeout <= pts) {
     gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
 
     if (test->num > 1 && test->type == RTP_TIMER_LOST && gap >= 0 &&
@@ -2232,6 +2092,8 @@ already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
           seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
       return TRUE;
     }
+
+    test = rtp_timer_get_next (test);
   }
 
   return FALSE;
@@ -2254,25 +2116,37 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
 
-  /* go through all timers and unschedule the ones with a large gap */
+  /* schedule immediatly expected timer which exceed the maximum RTX delay
+   * reorder configuration */
   if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
-    gint i, len;
-    len = priv->timers->len;
-    for (i = 0; i < len; i++) {
-      RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
+    RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
+    while (test) {
       gint gap;
 
+      /* filter the timer type to speed up this loop */
+      if (test->type != RTP_TIMER_EXPECTED) {
+        test = rtp_timer_get_next (test);
+        continue;
+      }
+
       gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
 
       GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
           test->type, test->seqnum, seqnum, gap);
 
-      if (gap > priv->rtx_delay_reorder) {
-        /* max gap, we exceeded the max reorder distance and we don't expect the
-         * missing packet to be this reordered */
-        if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED)
-          reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
-      }
+      /* if this expected packet have a smaller gap then the configured one,
+       * then earlier timer are not expected to have bigger gap as the timer
+       * queue is ordered */
+      if (gap <= priv->rtx_delay_reorder)
+        break;
+
+      /* max gap, we exceevded the max reorder distance and we don't expect the
+       * missing packet to be this reordered */
+      if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED)
+        rtp_timer_queue_update_timer (priv->timers, test, test->seqnum,
+            -1, 0, 0, FALSE);
+
+      test = rtp_timer_get_next (test);
     }
   }
 
@@ -2320,16 +2194,17 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
 
     if (timer) {
       timer->type = RTP_TIMER_EXPECTED;
-      reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
-          delay, TRUE);
+      rtp_timer_queue_update_timer (priv->timers, timer, priv->next_in_seqnum,
+          expected, delay, 0, TRUE);
     } else {
-      add_timer (jitterbuffer, RTP_TIMER_EXPECTED, priv->next_in_seqnum, 0,
+      rtp_timer_queue_set_expected (priv->timers, priv->next_in_seqnum,
           expected, delay, priv->packet_spacing);
     }
   } else if (timer && timer->type != RTP_TIMER_DEADLINE) {
     /* if we had a timer, remove it, we don't know when to expect the next
      * packet. */
-    remove_timer (jitterbuffer, timer);
+    rtp_timer_queue_unschedule (priv->timers, timer);
+    rtp_timer_free (timer);
   }
 }
 
@@ -2434,8 +2309,9 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
       /* this timer will fire immediately and the lost event will be pushed from
        * the timer thread */
       if (lost_packets > 0) {
-        add_timer (jitterbuffer, RTP_TIMER_LOST, expected, lost_packets,
-            priv->last_in_pts + duration, 0, gap_time);
+        rtp_timer_queue_set_lost (priv->timers, expected, lost_packets,
+            priv->last_in_pts + duration, gap_time,
+            timeout_offset (jitterbuffer));
         expected += lost_packets;
         priv->last_in_pts += gap_time;
       }
@@ -2451,7 +2327,7 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
   }
 
   if (priv->do_retransmission) {
-    RtpTimer *timer = find_timer (jitterbuffer, expected);
+    RtpTimer *timer = rtp_timer_queue_find (priv->timers, expected);
     GstClockTime rtx_delay = get_rtx_delay (priv);
 
     /* if we had a timer for the first missing packet, update it. */
@@ -2461,8 +2337,8 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
 
       timer->duration = duration;
       if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
-        reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
-            delay, TRUE);
+        rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
+            expected_pts, delay, 0, TRUE);
       }
       expected++;
       expected_pts += duration;
@@ -2472,15 +2348,15 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
       /* minimum delay the expected-timer has "waited" is the elapsed time
        * since expected arrival of the missing packet */
       GstClockTime delay = MAX (rtx_delay, pts - expected_pts);
-      add_timer (jitterbuffer, RTP_TIMER_EXPECTED, expected, 0, expected_pts,
+      rtp_timer_queue_set_expected (priv->timers, expected, expected_pts,
           delay, duration);
       expected_pts += duration;
       expected++;
     }
   } else {
     while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
-      add_timer (jitterbuffer, RTP_TIMER_LOST, expected, 0, expected_pts, 0,
-          duration);
+      rtp_timer_queue_set_lost (priv->timers, expected, 0, expected_pts,
+          duration, timeout_offset (jitterbuffer));
       expected_pts += duration;
       expected++;
     }
@@ -2678,7 +2554,7 @@ gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
   rtp_jitter_buffer_flush (priv->jbuf,
       (GFunc) free_item_and_retain_sticky_events, &events);
   rtp_jitter_buffer_reset_skew (priv->jbuf);
-  remove_all_timers (jitterbuffer);
+  rtp_timer_queue_remove_all (priv->timers);
   priv->discont = TRUE;
   priv->last_popped_seqnum = -1;
 
@@ -2752,7 +2628,7 @@ gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
   if (!item)
     return FALSE;
 
-  timer = find_timer (jitterbuffer, item->seqnum);
+  timer = rtp_timer_queue_find (priv->timers, item->seqnum);
   if (!timer || timer->type != RTP_TIMER_DEADLINE)
     return FALSE;
 
@@ -2761,6 +2637,7 @@ gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
     GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
         priv->faststart_min_packets);
     timer->timeout = -1;
+    rtp_timer_queue_reschedule (priv->timers, timer);
     return TRUE;
   }
 
@@ -2907,14 +2784,22 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
       "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
       max_dropout, max_misorder);
 
-  /* now check against our expected seqnum */
-  if (G_UNLIKELY (expected == -1)) {
-    if (G_UNLIKELY (GST_BUFFER_IS_RETRANSMISSION (buffer))) {
-      /* If the first buffer is an (old) rtx, e.g. from before a reset,
-       * ignore it. */
+  timer = rtp_timer_queue_find (priv->timers, seqnum);
+  if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
+    if (G_UNLIKELY (!priv->do_retransmission))
       goto unsolicited_rtx;
-    }
 
+    if (!timer)
+      timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
+
+    /* If the first buffer is an (old) rtx, e.g. from before a reset, or
+     * already lost, ignore it */
+    if (!timer || expected == -1)
+      goto unsolicited_rtx;
+  }
+
+  /* now check against our expected seqnum */
+  if (G_UNLIKELY (expected == -1)) {
     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
 
     /* calculate a pts based on rtptime and arrival time (dts) */
@@ -2931,7 +2816,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
     /* we don't know what the next_in_seqnum should be, wait for the last
      * possible moment to push this buffer, maybe we get an earlier seqnum
      * while we wait */
-    set_timer (jitterbuffer, RTP_TIMER_DEADLINE, seqnum, pts);
+    rtp_timer_queue_set_deadline (priv->timers, seqnum, pts,
+        timeout_offset (jitterbuffer));
 
     do_next_seqnum = TRUE;
     /* take rtptime and pts to calculate packet spacing */
@@ -2945,20 +2831,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
         expected, seqnum, gap);
 
-    if (G_UNLIKELY (GST_BUFFER_IS_RETRANSMISSION (buffer))) {
-      if (G_UNLIKELY (!priv->do_retransmission))
-        goto unsolicited_rtx;
-
-      /* If this packet is a rtx that we may have actually requested,
-       * make sure we actually did, or whether we still need it. */
-      timer = find_timer (jitterbuffer, seqnum);
-      if (!timer)
-        timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
-      if (!timer)
-        goto unsolicited_rtx;
-    }
-
-    if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
+    if (G_UNLIKELY (gap > 0 &&
+            rtp_timer_queue_length (priv->timers) >= max_dropout)) {
       /* If we have timers for more than RTP_MAX_DROPOUT packets
        * pending this means that we have a huge gap overall. We can
        * reset the jitterbuffer at this point because there's
@@ -2966,7 +2840,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
        * sensible with the past data. Just try again from the
        * next packet */
       GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
-          priv->timers->len, max_dropout);
+          rtp_timer_queue_length (priv->timers), max_dropout);
       gst_buffer_unref (buffer);
       return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
     }
@@ -3029,13 +2903,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
   }
 
-  timer = find_timer (jitterbuffer, seqnum);
-  if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
-    if (!timer)
-      timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
-    if (timer)
-      timer->num_rtx_received++;
-  }
+  if (GST_BUFFER_IS_RETRANSMISSION (buffer))
+    timer->num_rtx_received++;
 
   /* At 2^15, we would detect a seqnum rollover too early, therefore
    * limit the queue size. But let's not limit it to a number that is
@@ -3043,6 +2912,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
    * sequence number, let's allow at least 10k packets in any case. */
   while (rtp_jitter_buffer_is_full (priv->jbuf) &&
       priv->srcresult == GST_FLOW_OK) {
+    update_current_timer (jitterbuffer);
     JBUF_SIGNAL_EVENT (priv);
     JBUF_WAIT_QUEUE (priv);
   }
@@ -3072,7 +2942,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
     }
   }
 
-  if (already_lost (jitterbuffer, seqnum))
+  if (already_lost (jitterbuffer, pts, seqnum))
     goto already_lost;
 
   /* let's drop oldest packet if the queue is already full and drop-on-latency
@@ -3155,6 +3025,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
   msg = check_buffering_percent (jitterbuffer, percent);
 
 finished:
+  update_current_timer (jitterbuffer);
   JBUF_UNLOCK (priv);
 
   if (msg)
@@ -3353,7 +3224,8 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
 
   if (estimated != -1 && priv->estimated_eos != estimated) {
-    set_timer (jitterbuffer, RTP_TIMER_EOS, -1, estimated);
+    rtp_timer_queue_set_eos (priv->timers, estimated,
+        timeout_offset (jitterbuffer));
     priv->estimated_eos = estimated;
   }
 }
@@ -3442,7 +3314,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
   if (type == ITEM_TYPE_EVENT && outevent &&
       GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
     g_assert (priv->eos);
-    while (priv->timers->len > 0) {
+    while (rtp_timer_queue_length (priv->timers) > 0) {
       /* Stopping timers */
       unschedule_current_timer (jitterbuffer);
       JBUF_WAIT_TIMER (priv);
@@ -3737,6 +3609,7 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
   GstClockTime rtx_retry_period;
   GstClockTime rtx_retry_timeout;
   GstClock *clock;
+  GstClockTimeDiff offset = 0;
 
   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
@@ -3780,23 +3653,26 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
 
   /* calculate the timeout for the next retransmission attempt */
   timer->rtx_retry += rtx_retry_timeout;
-  GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
+  GST_DEBUG_OBJECT (jitterbuffer, "timer #%i base %" GST_TIME_FORMAT ", delay %"
       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
-      GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
-      GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
+      timer->seqnum, GST_TIME_ARGS (timer->rtx_base),
+      GST_TIME_ARGS (timer->rtx_delay), GST_TIME_ARGS (timer->rtx_retry),
+      timer->num_rtx_retry);
   if ((priv->rtx_max_retries != -1
           && timer->num_rtx_retry >= priv->rtx_max_retries)
       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
       || (timer->rtx_base + rtx_retry_period < now)) {
-    GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
+    GST_DEBUG_OBJECT (jitterbuffer, "reschedule #%i as LOST timer",
+        timer->seqnum);
     /* too many retransmission request, we now convert the timer
      * to a lost timer, leave the num_rtx_retry as it is for stats */
     timer->type = RTP_TIMER_LOST;
     timer->rtx_delay = 0;
     timer->rtx_retry = 0;
+    offset = timeout_offset (jitterbuffer);
   }
-  reschedule_timer (jitterbuffer, timer, timer->seqnum,
-      timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
+  rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
+      timer->rtx_base + timer->rtx_retry, timer->rtx_delay, offset, FALSE);
 
   JBUF_UNLOCK (priv);
   gst_pad_push_event (priv->sinkpad, event);
@@ -3835,7 +3711,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
   /* we now only accept seqnum bigger than this */
   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
     priv->next_in_seqnum = next_in_seqnum;
-    priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
+    priv->last_in_pts = apply_offset (jitterbuffer, get_pts_timeout (timer));
   }
 
   /* Avoid creating events if we don't need it. Note that we still need to create
@@ -3844,7 +3720,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
   if (priv->do_lost) {
     GstClockTime duration, timestamp;
     /* create paket lost event */
-    timestamp = apply_offset (jitterbuffer, timer->timeout);
+    timestamp = apply_offset (jitterbuffer, get_pts_timeout (timer));
     duration = timer->duration;
     if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
       duration = priv->packet_spacing;
@@ -3862,14 +3738,13 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
     rtp_jitter_buffer_free_item (item);
 
   if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
-    RtpTimer *stats_timer = rtp_timer_dup (timer);
     /* Store info to update stats if the packet arrives too late */
-    /* TODO this one could be zero-copy when the timers array is replaced */
-    stats_timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
-    stats_timer->type = RTP_TIMER_LOST;
-    rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer);
+    timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
+    timer->type = RTP_TIMER_LOST;
+    rtp_timer_queue_insert (priv->rtx_stats_timers, timer);
+  } else {
+    rtp_timer_free (timer);
   }
-  remove_timer (jitterbuffer, timer);
 
   if (head)
     JBUF_SIGNAL_EVENT (priv);
@@ -3884,7 +3759,7 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
 
   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
-  remove_timer (jitterbuffer, timer);
+  rtp_timer_free (timer);
   if (!priv->eos) {
     GstEvent *event;
 
@@ -3911,7 +3786,7 @@ do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
    * only mess with current ongoing seqnum if still unknown */
   if (priv->next_seqnum == -1)
     priv->next_seqnum = timer->seqnum;
-  remove_timer (jitterbuffer, timer);
+  rtp_timer_free (timer);
   JBUF_SIGNAL_EVENT (priv);
 
   return TRUE;
@@ -3956,8 +3831,14 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
   JBUF_LOCK (priv);
   while (priv->timer_running) {
     RtpTimer *timer = NULL;
-    GstClockTime timer_timeout = -1;
-    gint i, len;
+    GQueue timers = G_QUEUE_INIT;
+
+    /* don't produce data in paused */
+    while (priv->blocked) {
+      JBUF_WAIT_TIMER (priv);
+      if (!priv->timer_running)
+        goto stopping;
+    }
 
     /* If we have a clock, update "now" now with the very
      * latest running time we have. If timers are unscheduled below we
@@ -3982,77 +3863,33 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
     if (priv->do_retransmission)
       rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
 
-    /* Iterate "normal" timers */
-    len = priv->timers->len;
-    for (i = 0; i < len;) {
-      RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
-      GstClockTime test_timeout = get_timeout (jitterbuffer, test);
-      gboolean save_best = FALSE;
-
-      GST_DEBUG_OBJECT (jitterbuffer,
-          "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
-          test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
-          GST_STIME_ARGS ((gint64) (test_timeout - now)));
-
-      /* Weed out anything too late */
-      if (test->type == RTP_TIMER_LOST &&
-          (test_timeout == -1 || test_timeout <= now)) {
-        GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
-        do_lost_timeout (jitterbuffer, test, now);
-        if (!priv->timer_running)
-          break;
-        /* We don't move the iterator forward since we just removed the current entry,
-         * but we update the termination condition */
-        len = priv->timers->len;
-      } else {
-        /* find the smallest timeout */
-        if (timer == NULL) {
-          save_best = TRUE;
-        } else if (timer_timeout == -1) {
-          /* we already have an immediate timeout, the new timer must be an
-           * immediate timer with smaller seqnum to become the best */
-          if (test_timeout == -1
-              && (gst_rtp_buffer_compare_seqnum (test->seqnum,
-                      timer->seqnum) > 0))
-            save_best = TRUE;
-        } else if (test_timeout == -1) {
-          /* first immediate timer */
-          save_best = TRUE;
-        } else if (test_timeout < timer_timeout) {
-          /* earlier timer */
-          save_best = TRUE;
-        } else if (test_timeout == timer_timeout
-            && (gst_rtp_buffer_compare_seqnum (test->seqnum,
-                    timer->seqnum) > 0)) {
-          /* same timer, smaller seqnum */
-          save_best = TRUE;
+    /* Iterate expired "normal" timers */
+    while ((timer = rtp_timer_queue_pop_until (priv->timers, now))) {
+      do {
+        if (timer->type == RTP_TIMER_LOST) {
+          GST_DEBUG_OBJECT (jitterbuffer, "Weeding out expired lost timers");
+          do_lost_timeout (jitterbuffer, timer, now);
+        } else {
+          g_queue_push_tail_link (&timers, (GList *) timer);
         }
+      } while ((timer = rtp_timer_queue_pop_until (priv->timers, now)));
 
-        if (save_best) {
-          GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
-          timer = test;
-          timer_timeout = test_timeout;
-        }
-        i++;
-      }
+      /* execetute the remaining timers */
+      while ((timer = (RtpTimer *) g_queue_pop_head_link (&timers)))
+        do_timeout (jitterbuffer, timer, now);
     }
-    if (timer && !priv->blocked) {
+
+    timer = rtp_timer_queue_peek_earliest (priv->timers);
+    if (timer) {
       GstClock *clock;
       GstClockTime sync_time;
       GstClockID id;
       GstClockReturn ret;
       GstClockTimeDiff clock_jitter;
 
-      if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
-        /* We have normally removed all lost timers in the loop above */
-        g_assert (timer->type != RTP_TIMER_LOST);
-
-        do_timeout (jitterbuffer, timer, now);
-        /* check here, do_timeout could have released the lock */
-        if (!priv->timer_running)
-          break;
-        continue;
-      }
+      /* we poped all immediate and due timer, so this should just never
+       * happens */
+      g_assert (GST_CLOCK_TIME_IS_VALID (timer->timeout));
 
       GST_OBJECT_LOCK (jitterbuffer);
       clock = GST_ELEMENT_CLOCK (jitterbuffer);
@@ -4060,22 +3897,22 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
         GST_OBJECT_UNLOCK (jitterbuffer);
         /* let's just push if there is no clock */
         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
-        now = timer_timeout;
+        now = timer->timeout;
         continue;
       }
 
       /* prepare for sync against clock */
-      sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+      sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
       /* add latency of peer to get input time */
       sync_time += priv->peer_latency;
 
-      GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
-          " with sync time %" GST_TIME_FORMAT,
-          GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
+      GST_DEBUG_OBJECT (jitterbuffer, "timer #%i sync to timestamp %"
+          GST_TIME_FORMAT " with sync time %" GST_TIME_FORMAT, timer->seqnum,
+          GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (sync_time));
 
       /* create an entry for the clock */
       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
-      priv->timer_timeout = timer_timeout;
+      priv->timer_timeout = timer->timeout;
       priv->timer_seqnum = timer->seqnum;
       GST_OBJECT_UNLOCK (jitterbuffer);
 
@@ -4085,6 +3922,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
       ret = gst_clock_id_wait (id, &clock_jitter);
 
       JBUF_LOCK (priv);
+
       if (!priv->timer_running) {
         gst_clock_id_unref (id);
         priv->clock_id = NULL;
@@ -4092,21 +3930,29 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
       }
 
       if (ret != GST_CLOCK_UNSCHEDULED) {
-        now = timer_timeout + MAX (clock_jitter, 0);
+        now = timer->timeout + MAX (clock_jitter, 0);
         GST_DEBUG_OBJECT (jitterbuffer,
             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
             GST_STIME_ARGS (clock_jitter));
       } else {
         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
       }
+
       /* and free the entry */
       gst_clock_id_unref (id);
       priv->clock_id = NULL;
     } else {
+      /* when draining the timers, the pusher thread will reuse our
+       * condition to wait for completion. Signal that thread before
+       * sleeping again here */
+      if (priv->eos)
+        JBUF_SIGNAL_TIMER (priv);
+
       /* no timers, wait for activity */
       JBUF_WAIT_TIMER (priv);
     }
   }
+stopping:
   JBUF_UNLOCK (priv);
 
   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
index cd8f284..b172125 100644 (file)
@@ -59,9 +59,6 @@ typedef struct
   GList list;
   gboolean queued;
 
-  /* FIXME remove */
-  guint idx;
-
   guint16 seqnum;
   guint num;
   RtpTimerType type;