From: Wim Taymans Date: Tue, 30 Jul 2013 21:14:24 +0000 (+0200) Subject: rtpjitterbuffer: refactor jitterbuffer X-Git-Tag: 1.19.3~509^2~5511 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=77846d35c6f1676480016c5654e207ead50d1024;p=platform%2Fupstream%2Fgstreamer.git rtpjitterbuffer: refactor jitterbuffer Refactor the jitterbuffer code. Make separate function for peeking a buffer, pushing the next buffer, waiting for timeouts and handling the timeouts. The main loop now tries to push as many buffers as it can until it runs out of buffers or when it detects a seqnum discont. Then it will wait for some event to happen before attempting to push more buffers. Make methods to register timeouts in an array. These timeouts are registered when we detect a missing packet, sync for the first packet or when we find an estimation for the end-of-stream. This greatly simplifies and clarifies the code and also makes it possible to register more complicated timeout schemes later. --- diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index df2cd3d..8d4f57d 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -148,9 +148,12 @@ struct _GstRtpJitterBufferPrivate guint32 next_seqnum; /* last output time */ GstClockTime last_out_time; + GstClockTime last_out_pts; /* the next expected seqnum we receive */ guint32 next_in_seqnum; + GArray *timers; + /* start and stop ranges */ GstClockTime npt_start; GstClockTime npt_stop; @@ -158,7 +161,6 @@ struct _GstRtpJitterBufferPrivate guint64 last_elapsed; guint64 estimated_eos; GstClockID eos_id; - gboolean reached_npt_stop; /* state */ gboolean eos; @@ -188,6 +190,23 @@ struct _GstRtpJitterBufferPrivate guint64 num_duplicates; }; +typedef enum +{ + TIMER_TYPE_EXPECTED, + TIMER_TYPE_LOST, + TIMER_TYPE_DEADLINE, + TIMER_TYPE_EOS +} TimerType; + +typedef struct +{ + guint idx; + guint16 seqnum; + TimerType type; + GstClockTime pts; + GstClockTime timeout; +} TimerData; + #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \ (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \ GstRtpJitterBufferPrivate)) @@ -276,6 +295,7 @@ static GstClockTime gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer, gboolean active, guint64 base_time); static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer); +static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer); static void gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) @@ -466,6 +486,7 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) priv->latency_ns = priv->latency_ms * GST_MSECOND; priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY; priv->do_lost = DEFAULT_DO_LOST; + priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData)); priv->jbuf = rtp_jitter_buffer_new (); g_mutex_init (&priv->jbuf_lock); @@ -512,6 +533,7 @@ gst_rtp_jitter_buffer_finalize (GObject * object) jitterbuffer = GST_RTP_JITTER_BUFFER (object); + g_array_free (jitterbuffer->priv->timers, TRUE); g_mutex_clear (&jitterbuffer->priv->jbuf_lock); g_cond_clear (&jitterbuffer->priv->jbuf_cond); @@ -876,17 +898,18 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer) gst_segment_init (&priv->segment, GST_FORMAT_TIME); priv->last_popped_seqnum = -1; priv->last_out_time = -1; + priv->last_out_pts = -1; priv->next_seqnum = -1; priv->next_in_seqnum = -1; priv->clock_rate = -1; priv->eos = FALSE; priv->estimated_eos = -1; priv->last_elapsed = 0; - priv->reached_npt_stop = FALSE; priv->ext_timestamp = -1; GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); rtp_jitter_buffer_flush (priv->jbuf); rtp_jitter_buffer_reset_skew (priv->jbuf); + remove_all_timers (jitterbuffer); JBUF_UNLOCK (priv); } @@ -1347,6 +1370,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); rtp_jitter_buffer_flush (priv->jbuf); rtp_jitter_buffer_reset_skew (priv->jbuf); + remove_all_timers (jitterbuffer); priv->last_popped_seqnum = -1; priv->next_seqnum = seqnum; } @@ -1407,8 +1431,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, /* let's unschedule and unblock any waiting buffers. We only want to do this * when the tail buffer changed */ if (G_UNLIKELY (priv->clock_id && tail)) { - GST_DEBUG_OBJECT (jitterbuffer, - "Unscheduling waiting buffer, new tail buffer"); + GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer"); gst_clock_id_unschedule (priv->clock_id); priv->unscheduled = TRUE; } @@ -1508,30 +1531,78 @@ get_sync_time (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp) return result; } -static gboolean -eos_reached (GstClock * clock, GstClockTime time, GstClockID id, - GstRtpJitterBuffer * jitterbuffer) +#define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS + +static TimerData * +find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, + guint16 seqnum, gboolean * created) { - GstRtpJitterBufferPrivate *priv; + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + TimerData *timer; + gint i, len; + gboolean found = FALSE; + + len = priv->timers->len; + for (i = 0; i < len; i++) { + timer = &g_array_index (priv->timers, TimerData, i); + if (timer->seqnum == seqnum && timer->type == type) { + found = TRUE; + break; + } + } + if (!found) { + /* not found, create */ + g_array_set_size (priv->timers, len + 1); + timer = &g_array_index (priv->timers, TimerData, len); + timer->idx = len; + timer->type = type; + timer->seqnum = seqnum; + } + if (created) + *created = !found; - priv = jitterbuffer->priv; + return timer; +} - JBUF_LOCK_CHECK (priv, flushing); - if (priv->waiting) { - GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout"); - priv->reached_npt_stop = TRUE; - JBUF_SIGNAL (priv); - } - JBUF_UNLOCK (priv); +static GstFlowReturn +set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, + guint16 seqnum, GstClockTime pts) +{ + TimerData *timer; + GstClockTime out_time; - return TRUE; + out_time = apply_offset (jitterbuffer, pts); - /* ERRORS */ -flushing: - { - JBUF_UNLOCK (priv); - return FALSE; - } + GST_DEBUG_OBJECT (jitterbuffer, + "set timer for seqnum %d to %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (out_time)); + + /* find the seqnum timer */ + timer = find_timer (jitterbuffer, type, seqnum, NULL); + timer->timeout = out_time; + timer->pts = pts; + + return GST_FLOW_WAIT; +} + +static void +remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + guint idx; + + idx = timer->idx; + GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx); + g_array_remove_index_fast (priv->timers, idx); + timer->idx = idx; +} + +static void +remove_all_timers (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GST_DEBUG_OBJECT (jitterbuffer, "removed all timers"); + g_array_set_size (priv->timers, 0); } static GstClockTime @@ -1602,226 +1673,326 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf) GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %" GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated)); - priv->estimated_eos = estimated; + if (estimated != -1 && priv->estimated_eos != estimated) { + set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated); + priv->estimated_eos = estimated; + } } } } -/* - * This funcion will push out buffers on the source pad. - * - * For each pushed buffer, the seqnum is recorded, if the next buffer B has a - * different seqnum (missing packets before B), this function will wait for the - * missing packet to arrive up to the timestamp of buffer B. - */ -static void -gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) +/* take a buffer from the queue and push it */ +static GstFlowReturn +pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, + GstClockTime pts) { - GstRtpJitterBufferPrivate *priv; - GstBuffer *outbuf; + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstFlowReturn result; - guint16 seqnum; - guint32 next_seqnum; - GstClockTime timestamp, out_time; - gboolean discont = FALSE; - gint gap; - GstClock *clock; - GstClockID id; - GstClockTime sync_time; + GstBuffer *outbuf; gint percent = -1; - GstRTPBuffer rtp = { NULL, }; + GstClockTime out_time; - priv = jitterbuffer->priv; + /* when we get here we are ready to pop and push the buffer */ + outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent); - JBUF_LOCK_CHECK (priv, flushing); -again: - GST_DEBUG_OBJECT (jitterbuffer, "Peeking item"); - while (TRUE) { - id = NULL; - /* always wait if we are blocked */ - if (G_LIKELY (!priv->blocked)) { - /* we're buffering but not EOS, wait. */ - if (!priv->eos && (!priv->active - || rtp_jitter_buffer_is_buffering (priv->jbuf))) { - GstClockTime elapsed, delay, left; - - if (priv->estimated_eos == -1) - goto do_wait; - - outbuf = rtp_jitter_buffer_peek (priv->jbuf); - if (outbuf != NULL) { - elapsed = compute_elapsed (jitterbuffer, outbuf); - if (GST_BUFFER_DURATION_IS_VALID (outbuf)) - elapsed += GST_BUFFER_DURATION (outbuf); - } else { - GST_INFO_OBJECT (jitterbuffer, "no buffer, using last_elapsed"); - elapsed = priv->last_elapsed; - } + check_buffering_percent (jitterbuffer, &percent); - delay = rtp_jitter_buffer_get_delay (priv->jbuf); + if (G_UNLIKELY (priv->discont)) { + /* set DISCONT flag when we missed a packet. We pushed the buffer writable + * into the jitterbuffer so we can modify now. */ + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + priv->discont = FALSE; + } + if (G_UNLIKELY (priv->ts_discont)) { + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC); + priv->ts_discont = FALSE; + } - if (priv->estimated_eos > elapsed) - left = priv->estimated_eos - elapsed; - else - left = 0; - - GST_INFO_OBJECT (jitterbuffer, "buffering, elapsed %" GST_TIME_FORMAT - " estimated_eos %" GST_TIME_FORMAT " left %" GST_TIME_FORMAT - " delay %" GST_TIME_FORMAT, - GST_TIME_ARGS (elapsed), GST_TIME_ARGS (priv->estimated_eos), - GST_TIME_ARGS (left), GST_TIME_ARGS (delay)); - if (left > delay) - goto do_wait; - } - /* if we have a packet, we can exit the loop and grab it */ - if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0) - break; - /* no packets but we are EOS, do eos logic */ - if (G_UNLIKELY (priv->eos)) - goto do_eos; - /* underrun, wait for packets or flushing now if we are expecting an EOS - * timeout, set the async timer for it too */ - if (priv->estimated_eos != -1 && !priv->reached_npt_stop) { - sync_time = get_sync_time (jitterbuffer, priv->estimated_eos); - - GST_OBJECT_LOCK (jitterbuffer); - clock = GST_ELEMENT_CLOCK (jitterbuffer); - if (clock) { - GST_INFO_OBJECT (jitterbuffer, "scheduling timeout"); - id = gst_clock_new_single_shot_id (clock, sync_time); - gst_clock_id_wait_async (id, (GstClockCallback) eos_reached, - jitterbuffer, NULL); - } - GST_OBJECT_UNLOCK (jitterbuffer); - } - } - do_wait: - /* now we wait */ - GST_DEBUG_OBJECT (jitterbuffer, "waiting"); - priv->waiting = TRUE; - JBUF_WAIT (priv); - priv->waiting = FALSE; - GST_DEBUG_OBJECT (jitterbuffer, "waiting done"); + /* apply timestamp with offset to buffer now */ + out_time = apply_offset (jitterbuffer, pts); + GST_BUFFER_PTS (outbuf) = out_time; + GST_BUFFER_DTS (outbuf) = out_time; - if (id) { - /* unschedule any pending async notifications we might have */ - gst_clock_id_unschedule (id); - gst_clock_id_unref (id); - } - if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) - goto flushing; + /* update the elapsed time when we need to check against the npt stop time. */ + update_estimated_eos (jitterbuffer, outbuf); - if (id && priv->reached_npt_stop) { - goto do_npt_stop; - } + /* now we are ready to push the buffer. Save the seqnum and release the lock + * so the other end can push stuff in the queue again. */ + priv->last_popped_seqnum = seqnum; + priv->last_out_time = out_time; + priv->last_out_pts = pts; + priv->next_seqnum = (seqnum + 1) & 0xffff; + JBUF_UNLOCK (priv); + + if (percent != -1) + post_buffering_percent (jitterbuffer, percent); + + /* push buffer */ + GST_DEBUG_OBJECT (jitterbuffer, + "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum, + GST_TIME_ARGS (out_time)); + result = gst_pad_push (priv->srcpad, outbuf); + + JBUF_LOCK_CHECK (priv, out_flushing); + + return result; + + /* ERRORS */ +out_flushing: + { + return priv->srcresult; } +} + +static GstClockTime +estimate_pts (GstRtpJitterBuffer * jitterbuffer, GstClockTime pts, gint gap) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GstClockTime duration; - /* peek a buffer, we're just looking at the timestamp and the sequence number. + if (pts == -1 || priv->last_out_pts == -1) + return pts; + + GST_DEBUG_OBJECT (jitterbuffer, + "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, + GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_out_pts)); + + /* interpolate between the current time and the last time based on + * number of packets we are missing, this is the estimated duration + * for the missing packet based on equidistant packet spacing. Also make + * sure we never go negative. */ + if (pts >= priv->last_out_pts) + duration = (pts - priv->last_out_pts) / (gap + 1); + else + /* packet already lost, timer will timeout quickly */ + duration = 0; + + GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT, + GST_TIME_ARGS (duration)); + + /* add this duration to the timestamp of the last packet we pushed */ + pts = (priv->last_out_pts + duration); + + return pts; +} + +/* Peek a buffer and compare the seqnum to the expected seqnum. + * If all is fine, the buffer is pushed. + * If something is wrong, a timeout is set. We set 2 kinds of timeouts: + * * deadline: to the ultimate time we can still push the packet. We + * do this for the first packet to make sure we have the previous + * packets. + * * lost: the ultimate time we can receive a packet before we have + * to consider it lost. We estimate this based on the packet + * spacing. + */ +static GstFlowReturn +handle_next_buffer (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GstFlowReturn result = GST_FLOW_OK; + GstBuffer *outbuf; + guint16 seqnum; + GstClockTime pts; + guint32 next_seqnum; + gint gap; + GstRTPBuffer rtp = { NULL, }; + +again: + /* peek a buffer, we're just looking at the sequence number. * If all is fine, we'll pop and push it. If the sequence number is wrong we * wait on the timestamp. In the chain function we will unlock the wait when a * new buffer is available. The peeked buffer is valid for as long as we hold * the jitterbuffer lock. */ outbuf = rtp_jitter_buffer_peek (priv->jbuf); + if (outbuf == NULL) + goto wait; /* get the seqnum and the next expected seqnum */ gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp); seqnum = gst_rtp_buffer_get_seq (&rtp); gst_rtp_buffer_unmap (&rtp); + next_seqnum = priv->next_seqnum; /* get the timestamp, this is already corrected for clock skew by the * jitterbuffer */ - timestamp = GST_BUFFER_TIMESTAMP (outbuf); - - GST_DEBUG_OBJECT (jitterbuffer, - "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT - ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp), - rtp_jitter_buffer_num_packets (priv->jbuf)); - - /* apply our timestamp offset to the incomming buffer, this will be our output - * timestamp. */ - out_time = apply_offset (jitterbuffer, timestamp); + pts = GST_BUFFER_PTS (outbuf); /* get the gap between this and the previous packet. If we don't know the * previous packet seqnum assume no gap. */ - if (G_LIKELY (next_seqnum != -1)) { + if (G_UNLIKELY (next_seqnum == -1)) { + GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum); + /* we don't know what the next_seqnum should be, wait for the last + * possible moment to push this buffer, maybe we get an earlier seqnum + * while we wait */ + result = set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts); + } else { + /* else calculate GAP */ gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum); - /* if we have a packet that we already pushed or considered dropped, pop it - * off and get the next packet */ - if (G_UNLIKELY (gap < 0)) { + if (G_LIKELY (gap == 0)) { + /* no missing packet, pop and push */ + result = pop_and_push_next (jitterbuffer, seqnum, pts); + } else if (G_UNLIKELY (gap < 0)) { + /* if we have a packet that we already pushed or considered dropped, pop it + * off and get the next packet */ GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping", seqnum, next_seqnum); - outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent); + outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL); gst_buffer_unref (outbuf); goto again; + } else { + GST_DEBUG_OBJECT (jitterbuffer, + "Sequence number GAP detected: expected %d instead of %d (%d missing)", + next_seqnum, seqnum, gap); + /* packet missing, estimate when we should ultimately push this packet */ + pts = estimate_pts (jitterbuffer, pts, gap); + /* and set a timer for it */ + result = set_timer (jitterbuffer, TIMER_TYPE_LOST, next_seqnum, pts); } - } else { - GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet"); - gap = -1; } + return result; - /* If we don't know what the next seqnum should be (== -1) we have to wait - * because it might be possible that we are not receiving this buffer in-order, - * a buffer with a lower seqnum could arrive later and we want to push that - * earlier buffer before this buffer then. - * If we know the expected seqnum, we can compare it to the current seqnum to - * determine if we have missing a packet. If we have a missing packet (which - * must be before this packet) we can wait for it until the deadline for this - * packet expires. */ - if (G_UNLIKELY (gap != 0 && out_time != -1)) { - GstClockReturn ret; - GstClockTime duration = GST_CLOCK_TIME_NONE; - GstClockTimeDiff clock_jitter; - guint32 lost_packets = 1; - gboolean lost_packets_late = FALSE; +wait: + { + GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait"); + return GST_FLOW_WAIT; + } +} - if (gap > 0) { - /* we have a gap */ - GST_DEBUG_OBJECT (jitterbuffer, - "Sequence number GAP detected: expected %d instead of %d (%d missing)", - next_seqnum, seqnum, gap); +/* a packet is lost */ +static GstFlowReturn +do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, + GstClockTimeDiff clock_jitter) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GstClockTime duration = GST_CLOCK_TIME_NONE; + guint32 lost_packets = 1; + gboolean lost_packets_late = FALSE; - if (priv->last_out_time != -1) { - GST_DEBUG_OBJECT (jitterbuffer, - "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, - GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time)); - /* interpolate between the current time and the last time based on - * number of packets we are missing, this is the estimated duration - * for the missing packet based on equidistant packet spacing. Also make - * sure we never go negative. */ - if (out_time >= priv->last_out_time) - duration = (out_time - priv->last_out_time) / (gap + 1); - else - goto lost; +#if 0 + if (clock_jitter > 0 + && clock_jitter > (priv->latency_ns + priv->peer_latency)) { + GstClockTimeDiff total_duration; + GstClockTime out_time_diff; - GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT, - GST_TIME_ARGS (duration)); - /* add this duration to the timestamp of the last packet we pushed */ - out_time = (priv->last_out_time + duration); - } - } else { - /* we don't know what the next_seqnum should be, wait for the last - * possible moment to push this buffer, maybe we get an earlier seqnum - * while we wait */ - GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum); - } + out_time_diff = apply_offset (jitterbuffer, timer->pts) - timer->timeout; + total_duration = MIN (out_time_diff, clock_jitter); + + if (duration > 0) + lost_packets = total_duration / duration; + else + lost_packets = gap; + total_duration = lost_packets * duration; + + GST_DEBUG_OBJECT (jitterbuffer, + "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT + ") Cover up %d lost packets with duration %" GST_TIME_FORMAT, + GST_TIME_ARGS (clock_jitter), + lost_packets, GST_TIME_ARGS (total_duration)); + + duration = total_duration; + lost_packets_late = TRUE; + } +#endif + + /* we had a gap and thus we lost some packets. Create an event for this. */ + if (lost_packets > 1) + GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum, + timer->seqnum + lost_packets - 1); + else + GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum); + + priv->num_late += lost_packets; + priv->discont = TRUE; + + /* update our expected next packet */ + priv->last_popped_seqnum = timer->seqnum; + priv->last_out_time = timer->timeout; + priv->last_out_pts = timer->pts; + priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff; + /* remove timer now */ + remove_timer (jitterbuffer, timer); + + if (priv->do_lost) { + GstEvent *event; + + /* create paket lost event */ + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("GstRTPPacketLost", + "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum, + "timestamp", G_TYPE_UINT64, priv->last_out_time, + "duration", G_TYPE_UINT64, duration, + "late", G_TYPE_BOOLEAN, lost_packets_late, NULL)); + JBUF_UNLOCK (priv); + gst_pad_push_event (priv->srcpad, event); + JBUF_LOCK_CHECK (priv, flushing); + } + return GST_FLOW_OK; + + /* ERRORS */ +flushing: + { + GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); + return priv->srcresult; + } +} + +static GstFlowReturn +do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) +{ + GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout"); + remove_timer (jitterbuffer, timer); + + return GST_FLOW_EOS; +} + +/* called when we need to wait for the next timeout. + * + * We loop over the array of recorded timeouts and wait for the earliest one. + * When it timed out, do the logic associated with the timer. + * + * If there are no timers, we wait on a gcond until something new happens. + */ +static GstFlowReturn +wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GstFlowReturn result = GST_FLOW_OK; + gint i, len; + TimerData *timer = NULL; + + len = priv->timers->len; + for (i = 0; i < len; i++) { + TimerData *test = &g_array_index (priv->timers, TimerData, i); + + /* find the smallest timeout */ + if (timer == NULL || test->timeout == -1 || test->timeout < timer->timeout) + timer = test; + } + if (timer) { + GstClock *clock; + GstClockTime sync_time; + GstClockID id; + GstClockReturn ret; + GstClockTimeDiff clock_jitter; GST_OBJECT_LOCK (jitterbuffer); clock = GST_ELEMENT_CLOCK (jitterbuffer); if (!clock) { GST_OBJECT_UNLOCK (jitterbuffer); /* let's just push if there is no clock */ - GST_DEBUG_OBJECT (jitterbuffer, "No clock, push right away"); - goto push_buffer; + GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away"); + goto do_timeout; } /* prepare for sync against clock */ - sync_time = get_sync_time (jitterbuffer, out_time); + sync_time = get_sync_time (jitterbuffer, timer->timeout); GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT " with sync time %" GST_TIME_FORMAT, - GST_TIME_ARGS (out_time), GST_TIME_ARGS (sync_time)); + GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (sync_time)); /* create an entry for the clock */ id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time); @@ -1834,35 +2005,12 @@ again: ret = gst_clock_id_wait (id, &clock_jitter); JBUF_LOCK (priv); - GST_DEBUG_OBJECT (jitterbuffer, "sync done"); + GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, %" G_GINT64_FORMAT, + ret, clock_jitter); /* and free the entry */ gst_clock_id_unref (id); priv->clock_id = NULL; - if (ret == GST_CLOCK_EARLY && gap > 0 - && clock_jitter > (priv->latency_ns + priv->peer_latency)) { - GstClockTimeDiff total_duration; - GstClockTime out_time_diff; - - out_time_diff = apply_offset (jitterbuffer, timestamp) - out_time; - total_duration = MIN (out_time_diff, clock_jitter); - - if (duration > 0) - lost_packets = total_duration / duration; - else - lost_packets = gap; - total_duration = lost_packets * duration; - - GST_DEBUG_OBJECT (jitterbuffer, - "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT - ") Cover up %d lost packets with duration %" GST_TIME_FORMAT, - GST_TIME_ARGS (clock_jitter), - lost_packets, GST_TIME_ARGS (total_duration)); - - duration = total_duration; - lost_packets_late = TRUE; - } - /* at this point, the clock could have been unlocked by a timeout, a new * tail element was added to the queue or because we are shutting down. Check * for shutdown first. */ @@ -1874,144 +2022,99 @@ again: * element became available in the queue or we flushed the queue. * Grab it and try to push or sync. */ if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) { - GST_DEBUG_OBJECT (jitterbuffer, - "Wait got unscheduled, will retry to push with new buffer"); - goto again; + GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled"); + goto done; } - - lost: - /* we now timed out, this means we lost a packet or finished synchronizing - * on the first buffer. */ - if (gap > 0) { - GstEvent *event; - - /* we had a gap and thus we lost some packets. Create an event for this. */ - if (lost_packets > 1) - GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", next_seqnum, - next_seqnum + lost_packets - 1); - else - GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum); - - priv->num_late += lost_packets; - discont = TRUE; - - /* update our expected next packet */ - priv->last_popped_seqnum = next_seqnum; - priv->last_out_time += duration; - priv->next_seqnum = (next_seqnum + lost_packets) & 0xffff; - - if (priv->do_lost) { - /* create paket lost event */ - event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, - gst_structure_new ("GstRTPPacketLost", - "seqnum", G_TYPE_UINT, (guint) next_seqnum, - "timestamp", G_TYPE_UINT64, out_time, - "duration", G_TYPE_UINT64, duration, - "late", G_TYPE_BOOLEAN, lost_packets_late, NULL)); - JBUF_UNLOCK (priv); - gst_pad_push_event (priv->srcpad, event); - JBUF_LOCK_CHECK (priv, flushing); - } - /* look for next packet */ - goto again; + do_timeout: + switch (timer->type) { + case TIMER_TYPE_EXPECTED: + remove_timer (jitterbuffer, timer); + break; + case TIMER_TYPE_LOST: + result = do_lost_timeout (jitterbuffer, timer, clock_jitter); + break; + case TIMER_TYPE_DEADLINE: + priv->next_seqnum = timer->seqnum; + remove_timer (jitterbuffer, timer); + break; + case TIMER_TYPE_EOS: + result = do_eos_timeout (jitterbuffer, timer); + break; } - - /* there was no known gap,just the first packet, exit the loop and push */ - GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum); - - /* get new timestamp, latency might have changed */ - out_time = apply_offset (jitterbuffer, timestamp); + } else { + /* no timers, wait for activity */ + GST_DEBUG_OBJECT (jitterbuffer, "waiting"); + priv->waiting = TRUE; + JBUF_WAIT_CHECK (priv, flushing); + priv->waiting = FALSE; + GST_DEBUG_OBJECT (jitterbuffer, "waiting done"); } -push_buffer: - - /* when we get here we are ready to pop and push the buffer */ - outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent); - check_buffering_percent (jitterbuffer, &percent); +done: + return result; - if (G_UNLIKELY (discont || priv->discont)) { - /* set DISCONT flag when we missed a packet. We pushed the buffer writable - * into the jitterbuffer so we can modify now. */ - GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); - priv->discont = FALSE; - } - if (G_UNLIKELY (priv->ts_discont)) { - GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC); - priv->ts_discont = FALSE; +flushing: + { + GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); + return priv->srcresult; } +} - /* apply timestamp with offset to buffer now */ - GST_BUFFER_PTS (outbuf) = out_time; - GST_BUFFER_DTS (outbuf) = out_time; - - /* update the elapsed time when we need to check against the npt stop time. */ - update_estimated_eos (jitterbuffer, outbuf); +/* + * This funcion implements the main pushing loop on the source pad. + * + * It first tries to push as many buffers as possible. If there is a seqnum + * mismatch, a timeout is created and this function goes waiting for the + * next timeout. + */ +static void +gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + GstFlowReturn result; - /* now we are ready to push the buffer. Save the seqnum and release the lock - * so the other end can push stuff in the queue again. */ - priv->last_popped_seqnum = seqnum; - priv->last_out_time = out_time; - priv->next_seqnum = (seqnum + 1) & 0xffff; - JBUF_UNLOCK (priv); + priv = jitterbuffer->priv; - if (percent != -1) - post_buffering_percent (jitterbuffer, percent); + JBUF_LOCK_CHECK (priv, flushing); + while (TRUE) { + GST_DEBUG_OBJECT (jitterbuffer, "Peeking item"); - /* push buffer */ - GST_DEBUG_OBJECT (jitterbuffer, - "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum, - GST_TIME_ARGS (out_time)); - result = gst_pad_push (priv->srcpad, outbuf); - if (G_UNLIKELY (result != GST_FLOW_OK)) - goto pause; + result = handle_next_buffer (jitterbuffer); + if (result == GST_FLOW_WAIT) { + /* now wait for the next event */ + result = wait_next_timeout (jitterbuffer); + } + if (result != GST_FLOW_OK) + break; + } + JBUF_UNLOCK (priv); - return; + /* if we get here we need to pause */ + goto pause; /* ERRORS */ -do_eos: - { - /* store result, we are flushing now */ - GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream"); - priv->srcresult = GST_FLOW_EOS; - gst_pad_pause_task (priv->srcpad); - JBUF_UNLOCK (priv); - gst_pad_push_event (priv->srcpad, gst_event_new_eos ()); - return; - } -do_npt_stop: - { - /* store result, we are flushing now */ - GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop"); - JBUF_UNLOCK (priv); - - g_signal_emit (jitterbuffer, - gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL); - return; - } flushing: { - GST_DEBUG_OBJECT (jitterbuffer, "we are flushing"); - gst_pad_pause_task (priv->srcpad); + result = priv->srcresult; JBUF_UNLOCK (priv); - return; + goto pause; } pause: { - GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", - gst_flow_get_name (result)); + const gchar *reason = gst_flow_get_name (result); + GstEvent *event; - JBUF_LOCK (priv); - /* store result */ - priv->srcresult = result; - /* we don't post errors or anything because upstream will do that for us - * when we pass the return value upstream. */ + GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason); gst_pad_pause_task (priv->srcpad); - JBUF_UNLOCK (priv); + if (result == GST_FLOW_EOS) { + event = gst_event_new_eos (); + gst_pad_push_event (priv->srcpad, event); + } return; } } -/* collect the info form the lastest RTCP packet and the jittebuffer sync, do +/* collect the info from the lastest RTCP packet and the jitterbuffer sync, do * some sanity checks and then emit the handle-sync signal with the parameters. * This function must be called with the LOCK */ static void