GstClockTime last_in_pts;
guint32 next_in_seqnum;
- GArray *timers;
+ /* "normal" timers */
+ RtpTimerQueue *timers;
+ /* timers used for RTX statistics backlog */
RtpTimerQueue *rtx_stats_timers;
/* start and stop ranges */
static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
-static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
priv->last_rtptime = -1;
priv->avg_jitter = 0;
priv->segment_seqnum = GST_SEQNUM_INVALID;
- priv->timers = g_array_new (FALSE, TRUE, sizeof (RtpTimer));
+ priv->timers = rtp_timer_queue_new ();
priv->rtx_stats_timers = rtp_timer_queue_new ();
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
jitterbuffer = GST_RTP_JITTER_BUFFER (object);
priv = jitterbuffer->priv;
- g_array_free (priv->timers, TRUE);
+ g_object_unref (priv->timers);
g_object_unref (priv->rtx_stats_timers);
g_mutex_clear (&priv->jbuf_lock);
g_cond_clear (&priv->jbuf_queue);
rtp_jitter_buffer_flush (priv->jbuf, NULL, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
rtp_jitter_buffer_reset_skew (priv->jbuf);
- remove_all_timers (jitterbuffer);
+ rtp_timer_queue_remove_all (priv->timers);
g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&priv->gap_packets);
JBUF_UNLOCK (priv);
return timestamp;
}
-static RtpTimer *
-find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
+static GstClockTimeDiff
+timeout_offset (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- RtpTimer *timer = NULL;
- gint i, len;
-
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
- if (test->seqnum == seqnum) {
- timer = test;
- break;
- }
- }
- return timer;
+ return priv->ts_offset + priv->out_offset + priv->latency_ns;
}
static void
}
static GstClockTime
-get_timeout (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer)
+get_pts_timeout (const RtpTimer * timer)
{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime test_timeout;
-
- if ((test_timeout = timer->timeout) == -1)
+ if (timer->timeout == -1)
return -1;
- if (timer->type != RTP_TIMER_EXPECTED) {
- /* add our latency and offset to get output times. */
- test_timeout = apply_offset (jitterbuffer, test_timeout);
- test_timeout += priv->latency_ns;
- }
- return test_timeout;
+ return timer->timeout - timer->offset;
}
static void
-recalculate_timer (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-
- if (priv->clock_id) {
- GstClockTime timeout = get_timeout (jitterbuffer, timer);
-
- GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
-
- if (timeout == -1 || timeout < priv->timer_timeout)
- unschedule_current_timer (jitterbuffer);
- }
-}
-
-static RtpTimer *
-add_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimerType type,
- guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
- GstClockTime duration)
+update_current_timer (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
RtpTimer *timer;
- gint len;
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
- GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
- GST_TIME_ARGS (delay));
-
- len = priv->timers->len;
- g_array_set_size (priv->timers, len + 1);
- timer = &g_array_index (priv->timers, RtpTimer, len);
- timer->idx = len;
- timer->type = type;
- timer->seqnum = seqnum;
- timer->num = num;
- timer->timeout = timeout + delay;
- timer->duration = duration;
- if (type == RTP_TIMER_EXPECTED) {
- timer->rtx_base = timeout;
- timer->rtx_delay = delay;
- timer->rtx_retry = 0;
- }
- timer->rtx_last = GST_CLOCK_TIME_NONE;
- timer->num_rtx_retry = 0;
- timer->num_rtx_received = 0;
- recalculate_timer (jitterbuffer, timer);
- JBUF_SIGNAL_TIMER (priv);
-
- return timer;
-}
-
-static void
-reschedule_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
- guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- gboolean seqchange, timechange;
- guint16 oldseq;
- GstClockTime new_timeout;
- oldseq = timer->seqnum;
- new_timeout = timeout + delay;
- seqchange = oldseq != seqnum;
- timechange = timer->timeout != new_timeout;
+ timer = rtp_timer_queue_peek_earliest (priv->timers);
- if (!seqchange && !timechange) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
- "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
+ /* we never need to wakeup the timer thread when there is no more timers, if
+ * it was waiting on a clock id, it will simply do later and then wait on
+ * the conditions */
+ if (timer == NULL) {
+ GST_DEBUG_OBJECT (jitterbuffer, "no more timers");
return;
}
- GST_DEBUG_OBJECT (jitterbuffer,
- "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
- "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
- GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
-
- timer->timeout = new_timeout;
- timer->seqnum = seqnum;
- if (reset) {
- GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
- "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
- GST_TIME_ARGS (delay));
- timer->rtx_base = timeout;
- timer->rtx_delay = delay;
- timer->rtx_retry = 0;
- }
- if (seqchange) {
- timer->num_rtx_retry = 0;
- timer->num_rtx_received = 0;
- }
-
- if (priv->clock_id) {
- /* we changed the seqnum and there is a timer currently waiting with this
- * seqnum, unschedule it */
- if (seqchange && priv->timer_seqnum == oldseq)
- unschedule_current_timer (jitterbuffer);
- /* we changed the time, check if it is earlier than what we are waiting
- * for and unschedule if so */
- else if (timechange)
- recalculate_timer (jitterbuffer, timer);
- }
-}
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting till %" GST_TIME_FORMAT
+ " and earliest timeout is at %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (priv->timer_timeout), GST_TIME_ARGS (timer->timeout));
-static RtpTimer *
-set_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimerType type,
- guint16 seqnum, GstClockTime timeout)
-{
- RtpTimer *timer;
-
- /* find the seqnum timer */
- timer = find_timer (jitterbuffer, seqnum);
- if (timer == NULL) {
- timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
- } else {
- reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
- }
- return timer;
-}
-
-static void
-remove_timer (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- guint idx;
+ /* wakeup the timer thread in case the timer queue was empty */
+ JBUF_SIGNAL_TIMER (priv);
- if (timer->idx == -1)
+ /* no need to wait if the current wait is earlier or later */
+ if (timer->timeout != -1 && timer->timeout >= priv->timer_timeout)
return;
- if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
- unschedule_current_timer (jitterbuffer);
-
- idx = timer->idx;
- GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
- g_array_remove_index_fast (priv->timers, idx);
- timer->idx = idx;
-
- JBUF_SIGNAL_TIMER (priv);
-}
-
-static void
-remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
- g_array_set_size (priv->timers, 0);
+ /* for other cases, force a reschedule of the timer thread */
unschedule_current_timer (jitterbuffer);
- JBUF_SIGNAL_TIMER (priv);
}
/* get the extra delay to wait before sending RTX */
/* Check if packet with seqnum is already considered definitely lost by being
* part of a "lost timer" for multiple packets */
static gboolean
-already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
+already_lost (GstRtpJitterBuffer * jitterbuffer, GstClockTime pts,
+ guint16 seqnum)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- gint i, len;
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
+ RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
+ while (test && test->timeout <= pts) {
gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
if (test->num > 1 && test->type == RTP_TIMER_LOST && gap >= 0 &&
seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
return TRUE;
}
+
+ test = rtp_timer_get_next (test);
}
return FALSE;
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- /* go through all timers and unschedule the ones with a large gap */
+ /* schedule immediatly expected timer which exceed the maximum RTX delay
+ * reorder configuration */
if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
- gint i, len;
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
+ RtpTimer *test = rtp_timer_queue_peek_earliest (priv->timers);
+ while (test) {
gint gap;
+ /* filter the timer type to speed up this loop */
+ if (test->type != RTP_TIMER_EXPECTED) {
+ test = rtp_timer_get_next (test);
+ continue;
+ }
+
gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
test->type, test->seqnum, seqnum, gap);
- if (gap > priv->rtx_delay_reorder) {
- /* max gap, we exceeded the max reorder distance and we don't expect the
- * missing packet to be this reordered */
- if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED)
- reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
- }
+ /* if this expected packet have a smaller gap then the configured one,
+ * then earlier timer are not expected to have bigger gap as the timer
+ * queue is ordered */
+ if (gap <= priv->rtx_delay_reorder)
+ break;
+
+ /* max gap, we exceevded the max reorder distance and we don't expect the
+ * missing packet to be this reordered */
+ if (test->num_rtx_retry == 0 && test->type == RTP_TIMER_EXPECTED)
+ rtp_timer_queue_update_timer (priv->timers, test, test->seqnum,
+ -1, 0, 0, FALSE);
+
+ test = rtp_timer_get_next (test);
}
}
if (timer) {
timer->type = RTP_TIMER_EXPECTED;
- reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
- delay, TRUE);
+ rtp_timer_queue_update_timer (priv->timers, timer, priv->next_in_seqnum,
+ expected, delay, 0, TRUE);
} else {
- add_timer (jitterbuffer, RTP_TIMER_EXPECTED, priv->next_in_seqnum, 0,
+ rtp_timer_queue_set_expected (priv->timers, priv->next_in_seqnum,
expected, delay, priv->packet_spacing);
}
} else if (timer && timer->type != RTP_TIMER_DEADLINE) {
/* if we had a timer, remove it, we don't know when to expect the next
* packet. */
- remove_timer (jitterbuffer, timer);
+ rtp_timer_queue_unschedule (priv->timers, timer);
+ rtp_timer_free (timer);
}
}
/* this timer will fire immediately and the lost event will be pushed from
* the timer thread */
if (lost_packets > 0) {
- add_timer (jitterbuffer, RTP_TIMER_LOST, expected, lost_packets,
- priv->last_in_pts + duration, 0, gap_time);
+ rtp_timer_queue_set_lost (priv->timers, expected, lost_packets,
+ priv->last_in_pts + duration, gap_time,
+ timeout_offset (jitterbuffer));
expected += lost_packets;
priv->last_in_pts += gap_time;
}
}
if (priv->do_retransmission) {
- RtpTimer *timer = find_timer (jitterbuffer, expected);
+ RtpTimer *timer = rtp_timer_queue_find (priv->timers, expected);
GstClockTime rtx_delay = get_rtx_delay (priv);
/* if we had a timer for the first missing packet, update it. */
timer->duration = duration;
if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
- reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
- delay, TRUE);
+ rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
+ expected_pts, delay, 0, TRUE);
}
expected++;
expected_pts += duration;
/* minimum delay the expected-timer has "waited" is the elapsed time
* since expected arrival of the missing packet */
GstClockTime delay = MAX (rtx_delay, pts - expected_pts);
- add_timer (jitterbuffer, RTP_TIMER_EXPECTED, expected, 0, expected_pts,
+ rtp_timer_queue_set_expected (priv->timers, expected, expected_pts,
delay, duration);
expected_pts += duration;
expected++;
}
} else {
while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
- add_timer (jitterbuffer, RTP_TIMER_LOST, expected, 0, expected_pts, 0,
- duration);
+ rtp_timer_queue_set_lost (priv->timers, expected, 0, expected_pts,
+ duration, timeout_offset (jitterbuffer));
expected_pts += duration;
expected++;
}
rtp_jitter_buffer_flush (priv->jbuf,
(GFunc) free_item_and_retain_sticky_events, &events);
rtp_jitter_buffer_reset_skew (priv->jbuf);
- remove_all_timers (jitterbuffer);
+ rtp_timer_queue_remove_all (priv->timers);
priv->discont = TRUE;
priv->last_popped_seqnum = -1;
if (!item)
return FALSE;
- timer = find_timer (jitterbuffer, item->seqnum);
+ timer = rtp_timer_queue_find (priv->timers, item->seqnum);
if (!timer || timer->type != RTP_TIMER_DEADLINE)
return FALSE;
GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
priv->faststart_min_packets);
timer->timeout = -1;
+ rtp_timer_queue_reschedule (priv->timers, timer);
return TRUE;
}
"packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
max_dropout, max_misorder);
- /* now check against our expected seqnum */
- if (G_UNLIKELY (expected == -1)) {
- if (G_UNLIKELY (GST_BUFFER_IS_RETRANSMISSION (buffer))) {
- /* If the first buffer is an (old) rtx, e.g. from before a reset,
- * ignore it. */
+ timer = rtp_timer_queue_find (priv->timers, seqnum);
+ if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
+ if (G_UNLIKELY (!priv->do_retransmission))
goto unsolicited_rtx;
- }
+ if (!timer)
+ timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
+
+ /* If the first buffer is an (old) rtx, e.g. from before a reset, or
+ * already lost, ignore it */
+ if (!timer || expected == -1)
+ goto unsolicited_rtx;
+ }
+
+ /* now check against our expected seqnum */
+ if (G_UNLIKELY (expected == -1)) {
GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
/* calculate a pts based on rtptime and arrival time (dts) */
/* we don't know what the next_in_seqnum should be, wait for the last
* possible moment to push this buffer, maybe we get an earlier seqnum
* while we wait */
- set_timer (jitterbuffer, RTP_TIMER_DEADLINE, seqnum, pts);
+ rtp_timer_queue_set_deadline (priv->timers, seqnum, pts,
+ timeout_offset (jitterbuffer));
do_next_seqnum = TRUE;
/* take rtptime and pts to calculate packet spacing */
GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
expected, seqnum, gap);
- if (G_UNLIKELY (GST_BUFFER_IS_RETRANSMISSION (buffer))) {
- if (G_UNLIKELY (!priv->do_retransmission))
- goto unsolicited_rtx;
-
- /* If this packet is a rtx that we may have actually requested,
- * make sure we actually did, or whether we still need it. */
- timer = find_timer (jitterbuffer, seqnum);
- if (!timer)
- timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
- if (!timer)
- goto unsolicited_rtx;
- }
-
- if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
+ if (G_UNLIKELY (gap > 0 &&
+ rtp_timer_queue_length (priv->timers) >= max_dropout)) {
/* If we have timers for more than RTP_MAX_DROPOUT packets
* pending this means that we have a huge gap overall. We can
* reset the jitterbuffer at this point because there's
* sensible with the past data. Just try again from the
* next packet */
GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
- priv->timers->len, max_dropout);
+ rtp_timer_queue_length (priv->timers), max_dropout);
gst_buffer_unref (buffer);
return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
}
priv->next_in_seqnum = (seqnum + 1) & 0xffff;
}
- timer = find_timer (jitterbuffer, seqnum);
- if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
- if (!timer)
- timer = rtp_timer_queue_find (priv->rtx_stats_timers, seqnum);
- if (timer)
- timer->num_rtx_received++;
- }
+ if (GST_BUFFER_IS_RETRANSMISSION (buffer))
+ timer->num_rtx_received++;
/* At 2^15, we would detect a seqnum rollover too early, therefore
* limit the queue size. But let's not limit it to a number that is
* sequence number, let's allow at least 10k packets in any case. */
while (rtp_jitter_buffer_is_full (priv->jbuf) &&
priv->srcresult == GST_FLOW_OK) {
+ update_current_timer (jitterbuffer);
JBUF_SIGNAL_EVENT (priv);
JBUF_WAIT_QUEUE (priv);
}
}
}
- if (already_lost (jitterbuffer, seqnum))
+ if (already_lost (jitterbuffer, pts, seqnum))
goto already_lost;
/* let's drop oldest packet if the queue is already full and drop-on-latency
msg = check_buffering_percent (jitterbuffer, percent);
finished:
+ update_current_timer (jitterbuffer);
JBUF_UNLOCK (priv);
if (msg)
GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
if (estimated != -1 && priv->estimated_eos != estimated) {
- set_timer (jitterbuffer, RTP_TIMER_EOS, -1, estimated);
+ rtp_timer_queue_set_eos (priv->timers, estimated,
+ timeout_offset (jitterbuffer));
priv->estimated_eos = estimated;
}
}
if (type == ITEM_TYPE_EVENT && outevent &&
GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
g_assert (priv->eos);
- while (priv->timers->len > 0) {
+ while (rtp_timer_queue_length (priv->timers) > 0) {
/* Stopping timers */
unschedule_current_timer (jitterbuffer);
JBUF_WAIT_TIMER (priv);
GstClockTime rtx_retry_period;
GstClockTime rtx_retry_timeout;
GstClock *clock;
+ GstClockTimeDiff offset = 0;
GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
/* calculate the timeout for the next retransmission attempt */
timer->rtx_retry += rtx_retry_timeout;
- GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
+ GST_DEBUG_OBJECT (jitterbuffer, "timer #%i base %" GST_TIME_FORMAT ", delay %"
GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
- GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
- GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
+ timer->seqnum, GST_TIME_ARGS (timer->rtx_base),
+ GST_TIME_ARGS (timer->rtx_delay), GST_TIME_ARGS (timer->rtx_retry),
+ timer->num_rtx_retry);
if ((priv->rtx_max_retries != -1
&& timer->num_rtx_retry >= priv->rtx_max_retries)
|| (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
|| (timer->rtx_base + rtx_retry_period < now)) {
- GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
+ GST_DEBUG_OBJECT (jitterbuffer, "reschedule #%i as LOST timer",
+ timer->seqnum);
/* too many retransmission request, we now convert the timer
* to a lost timer, leave the num_rtx_retry as it is for stats */
timer->type = RTP_TIMER_LOST;
timer->rtx_delay = 0;
timer->rtx_retry = 0;
+ offset = timeout_offset (jitterbuffer);
}
- reschedule_timer (jitterbuffer, timer, timer->seqnum,
- timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
+ rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
+ timer->rtx_base + timer->rtx_retry, timer->rtx_delay, offset, FALSE);
JBUF_UNLOCK (priv);
gst_pad_push_event (priv->sinkpad, event);
/* we now only accept seqnum bigger than this */
if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
priv->next_in_seqnum = next_in_seqnum;
- priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
+ priv->last_in_pts = apply_offset (jitterbuffer, get_pts_timeout (timer));
}
/* Avoid creating events if we don't need it. Note that we still need to create
if (priv->do_lost) {
GstClockTime duration, timestamp;
/* create paket lost event */
- timestamp = apply_offset (jitterbuffer, timer->timeout);
+ timestamp = apply_offset (jitterbuffer, get_pts_timeout (timer));
duration = timer->duration;
if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
duration = priv->packet_spacing;
rtp_jitter_buffer_free_item (item);
if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
- RtpTimer *stats_timer = rtp_timer_dup (timer);
/* Store info to update stats if the packet arrives too late */
- /* TODO this one could be zero-copy when the timers array is replaced */
- stats_timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
- stats_timer->type = RTP_TIMER_LOST;
- rtp_timer_queue_insert (priv->rtx_stats_timers, stats_timer);
+ timer->timeout = now + priv->rtx_stats_timeout * GST_MSECOND;
+ timer->type = RTP_TIMER_LOST;
+ rtp_timer_queue_insert (priv->rtx_stats_timers, timer);
+ } else {
+ rtp_timer_free (timer);
}
- remove_timer (jitterbuffer, timer);
if (head)
JBUF_SIGNAL_EVENT (priv);
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
- remove_timer (jitterbuffer, timer);
+ rtp_timer_free (timer);
if (!priv->eos) {
GstEvent *event;
* only mess with current ongoing seqnum if still unknown */
if (priv->next_seqnum == -1)
priv->next_seqnum = timer->seqnum;
- remove_timer (jitterbuffer, timer);
+ rtp_timer_free (timer);
JBUF_SIGNAL_EVENT (priv);
return TRUE;
JBUF_LOCK (priv);
while (priv->timer_running) {
RtpTimer *timer = NULL;
- GstClockTime timer_timeout = -1;
- gint i, len;
+ GQueue timers = G_QUEUE_INIT;
+
+ /* don't produce data in paused */
+ while (priv->blocked) {
+ JBUF_WAIT_TIMER (priv);
+ if (!priv->timer_running)
+ goto stopping;
+ }
/* If we have a clock, update "now" now with the very
* latest running time we have. If timers are unscheduled below we
if (priv->do_retransmission)
rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
- /* Iterate "normal" timers */
- len = priv->timers->len;
- for (i = 0; i < len;) {
- RtpTimer *test = &g_array_index (priv->timers, RtpTimer, i);
- GstClockTime test_timeout = get_timeout (jitterbuffer, test);
- gboolean save_best = FALSE;
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
- test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
- GST_STIME_ARGS ((gint64) (test_timeout - now)));
-
- /* Weed out anything too late */
- if (test->type == RTP_TIMER_LOST &&
- (test_timeout == -1 || test_timeout <= now)) {
- GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
- do_lost_timeout (jitterbuffer, test, now);
- if (!priv->timer_running)
- break;
- /* We don't move the iterator forward since we just removed the current entry,
- * but we update the termination condition */
- len = priv->timers->len;
- } else {
- /* find the smallest timeout */
- if (timer == NULL) {
- save_best = TRUE;
- } else if (timer_timeout == -1) {
- /* we already have an immediate timeout, the new timer must be an
- * immediate timer with smaller seqnum to become the best */
- if (test_timeout == -1
- && (gst_rtp_buffer_compare_seqnum (test->seqnum,
- timer->seqnum) > 0))
- save_best = TRUE;
- } else if (test_timeout == -1) {
- /* first immediate timer */
- save_best = TRUE;
- } else if (test_timeout < timer_timeout) {
- /* earlier timer */
- save_best = TRUE;
- } else if (test_timeout == timer_timeout
- && (gst_rtp_buffer_compare_seqnum (test->seqnum,
- timer->seqnum) > 0)) {
- /* same timer, smaller seqnum */
- save_best = TRUE;
+ /* Iterate expired "normal" timers */
+ while ((timer = rtp_timer_queue_pop_until (priv->timers, now))) {
+ do {
+ if (timer->type == RTP_TIMER_LOST) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Weeding out expired lost timers");
+ do_lost_timeout (jitterbuffer, timer, now);
+ } else {
+ g_queue_push_tail_link (&timers, (GList *) timer);
}
+ } while ((timer = rtp_timer_queue_pop_until (priv->timers, now)));
- if (save_best) {
- GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
- timer = test;
- timer_timeout = test_timeout;
- }
- i++;
- }
+ /* execetute the remaining timers */
+ while ((timer = (RtpTimer *) g_queue_pop_head_link (&timers)))
+ do_timeout (jitterbuffer, timer, now);
}
- if (timer && !priv->blocked) {
+
+ timer = rtp_timer_queue_peek_earliest (priv->timers);
+ if (timer) {
GstClock *clock;
GstClockTime sync_time;
GstClockID id;
GstClockReturn ret;
GstClockTimeDiff clock_jitter;
- if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
- /* We have normally removed all lost timers in the loop above */
- g_assert (timer->type != RTP_TIMER_LOST);
-
- do_timeout (jitterbuffer, timer, now);
- /* check here, do_timeout could have released the lock */
- if (!priv->timer_running)
- break;
- continue;
- }
+ /* we poped all immediate and due timer, so this should just never
+ * happens */
+ g_assert (GST_CLOCK_TIME_IS_VALID (timer->timeout));
GST_OBJECT_LOCK (jitterbuffer);
clock = GST_ELEMENT_CLOCK (jitterbuffer);
GST_OBJECT_UNLOCK (jitterbuffer);
/* let's just push if there is no clock */
GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
- now = timer_timeout;
+ now = timer->timeout;
continue;
}
/* prepare for sync against clock */
- sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
/* add latency of peer to get input time */
sync_time += priv->peer_latency;
- GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
- " with sync time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
+ GST_DEBUG_OBJECT (jitterbuffer, "timer #%i sync to timestamp %"
+ GST_TIME_FORMAT " with sync time %" GST_TIME_FORMAT, timer->seqnum,
+ GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (sync_time));
/* create an entry for the clock */
id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
- priv->timer_timeout = timer_timeout;
+ priv->timer_timeout = timer->timeout;
priv->timer_seqnum = timer->seqnum;
GST_OBJECT_UNLOCK (jitterbuffer);
ret = gst_clock_id_wait (id, &clock_jitter);
JBUF_LOCK (priv);
+
if (!priv->timer_running) {
gst_clock_id_unref (id);
priv->clock_id = NULL;
}
if (ret != GST_CLOCK_UNSCHEDULED) {
- now = timer_timeout + MAX (clock_jitter, 0);
+ now = timer->timeout + MAX (clock_jitter, 0);
GST_DEBUG_OBJECT (jitterbuffer,
"sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
GST_STIME_ARGS (clock_jitter));
} else {
GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
}
+
/* and free the entry */
gst_clock_id_unref (id);
priv->clock_id = NULL;
} else {
+ /* when draining the timers, the pusher thread will reuse our
+ * condition to wait for completion. Signal that thread before
+ * sleeping again here */
+ if (priv->eos)
+ JBUF_SIGNAL_TIMER (priv);
+
/* no timers, wait for activity */
JBUF_WAIT_TIMER (priv);
}
}
+stopping:
JBUF_UNLOCK (priv);
GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");