guint32 next_seqnum;
/* last output time */
GstClockTime last_out_time;
+ GstClockTime last_out_pts;
/* the next expected seqnum we receive */
guint32 next_in_seqnum;
+ GArray *timers;
+
/* start and stop ranges */
GstClockTime npt_start;
GstClockTime npt_stop;
guint64 last_elapsed;
guint64 estimated_eos;
GstClockID eos_id;
- gboolean reached_npt_stop;
/* state */
gboolean eos;
guint64 num_duplicates;
};
+typedef enum
+{
+ TIMER_TYPE_EXPECTED,
+ TIMER_TYPE_LOST,
+ TIMER_TYPE_DEADLINE,
+ TIMER_TYPE_EOS
+} TimerType;
+
+typedef struct
+{
+ guint idx;
+ guint16 seqnum;
+ TimerType type;
+ GstClockTime pts;
+ GstClockTime timeout;
+} TimerData;
+
#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
(G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
GstRtpJitterBufferPrivate))
gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
gboolean active, guint64 base_time);
static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
+static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
priv->latency_ns = priv->latency_ms * GST_MSECOND;
priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
priv->do_lost = DEFAULT_DO_LOST;
+ priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+ g_array_free (jitterbuffer->priv->timers, TRUE);
g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
g_cond_clear (&jitterbuffer->priv->jbuf_cond);
gst_segment_init (&priv->segment, GST_FORMAT_TIME);
priv->last_popped_seqnum = -1;
priv->last_out_time = -1;
+ priv->last_out_pts = -1;
priv->next_seqnum = -1;
priv->next_in_seqnum = -1;
priv->clock_rate = -1;
priv->eos = FALSE;
priv->estimated_eos = -1;
priv->last_elapsed = 0;
- priv->reached_npt_stop = FALSE;
priv->ext_timestamp = -1;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf);
rtp_jitter_buffer_reset_skew (priv->jbuf);
+ remove_all_timers (jitterbuffer);
JBUF_UNLOCK (priv);
}
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf);
rtp_jitter_buffer_reset_skew (priv->jbuf);
+ remove_all_timers (jitterbuffer);
priv->last_popped_seqnum = -1;
priv->next_seqnum = seqnum;
}
/* let's unschedule and unblock any waiting buffers. We only want to do this
* when the tail buffer changed */
if (G_UNLIKELY (priv->clock_id && tail)) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "Unscheduling waiting buffer, new tail buffer");
+ GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
gst_clock_id_unschedule (priv->clock_id);
priv->unscheduled = TRUE;
}
return result;
}
-static gboolean
-eos_reached (GstClock * clock, GstClockTime time, GstClockID id,
- GstRtpJitterBuffer * jitterbuffer)
+#define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
+
+static TimerData *
+find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
+ guint16 seqnum, gboolean * created)
{
- GstRtpJitterBufferPrivate *priv;
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ TimerData *timer;
+ gint i, len;
+ gboolean found = FALSE;
+
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ timer = &g_array_index (priv->timers, TimerData, i);
+ if (timer->seqnum == seqnum && timer->type == type) {
+ found = TRUE;
+ break;
+ }
+ }
+ if (!found) {
+ /* not found, create */
+ g_array_set_size (priv->timers, len + 1);
+ timer = &g_array_index (priv->timers, TimerData, len);
+ timer->idx = len;
+ timer->type = type;
+ timer->seqnum = seqnum;
+ }
+ if (created)
+ *created = !found;
- priv = jitterbuffer->priv;
+ return timer;
+}
- JBUF_LOCK_CHECK (priv, flushing);
- if (priv->waiting) {
- GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
- priv->reached_npt_stop = TRUE;
- JBUF_SIGNAL (priv);
- }
- JBUF_UNLOCK (priv);
+static GstFlowReturn
+set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
+ guint16 seqnum, GstClockTime pts)
+{
+ TimerData *timer;
+ GstClockTime out_time;
- return TRUE;
+ out_time = apply_offset (jitterbuffer, pts);
- /* ERRORS */
-flushing:
- {
- JBUF_UNLOCK (priv);
- return FALSE;
- }
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "set timer for seqnum %d to %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (out_time));
+
+ /* find the seqnum timer */
+ timer = find_timer (jitterbuffer, type, seqnum, NULL);
+ timer->timeout = out_time;
+ timer->pts = pts;
+
+ return GST_FLOW_WAIT;
+}
+
+static void
+remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ guint idx;
+
+ idx = timer->idx;
+ GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
+ g_array_remove_index_fast (priv->timers, idx);
+ timer->idx = idx;
+}
+
+static void
+remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
+ g_array_set_size (priv->timers, 0);
}
static GstClockTime
GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
- priv->estimated_eos = estimated;
+ if (estimated != -1 && priv->estimated_eos != estimated) {
+ set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
+ priv->estimated_eos = estimated;
+ }
}
}
}
-/*
- * This funcion will push out buffers on the source pad.
- *
- * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
- * different seqnum (missing packets before B), this function will wait for the
- * missing packet to arrive up to the timestamp of buffer B.
- */
-static void
-gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
+/* take a buffer from the queue and push it */
+static GstFlowReturn
+pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
+ GstClockTime pts)
{
- GstRtpJitterBufferPrivate *priv;
- GstBuffer *outbuf;
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result;
- guint16 seqnum;
- guint32 next_seqnum;
- GstClockTime timestamp, out_time;
- gboolean discont = FALSE;
- gint gap;
- GstClock *clock;
- GstClockID id;
- GstClockTime sync_time;
+ GstBuffer *outbuf;
gint percent = -1;
- GstRTPBuffer rtp = { NULL, };
+ GstClockTime out_time;
- priv = jitterbuffer->priv;
+ /* when we get here we are ready to pop and push the buffer */
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
- JBUF_LOCK_CHECK (priv, flushing);
-again:
- GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
- while (TRUE) {
- id = NULL;
- /* always wait if we are blocked */
- if (G_LIKELY (!priv->blocked)) {
- /* we're buffering but not EOS, wait. */
- if (!priv->eos && (!priv->active
- || rtp_jitter_buffer_is_buffering (priv->jbuf))) {
- GstClockTime elapsed, delay, left;
-
- if (priv->estimated_eos == -1)
- goto do_wait;
-
- outbuf = rtp_jitter_buffer_peek (priv->jbuf);
- if (outbuf != NULL) {
- elapsed = compute_elapsed (jitterbuffer, outbuf);
- if (GST_BUFFER_DURATION_IS_VALID (outbuf))
- elapsed += GST_BUFFER_DURATION (outbuf);
- } else {
- GST_INFO_OBJECT (jitterbuffer, "no buffer, using last_elapsed");
- elapsed = priv->last_elapsed;
- }
+ check_buffering_percent (jitterbuffer, &percent);
- delay = rtp_jitter_buffer_get_delay (priv->jbuf);
+ if (G_UNLIKELY (priv->discont)) {
+ /* set DISCONT flag when we missed a packet. We pushed the buffer writable
+ * into the jitterbuffer so we can modify now. */
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ priv->discont = FALSE;
+ }
+ if (G_UNLIKELY (priv->ts_discont)) {
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
+ priv->ts_discont = FALSE;
+ }
- if (priv->estimated_eos > elapsed)
- left = priv->estimated_eos - elapsed;
- else
- left = 0;
-
- GST_INFO_OBJECT (jitterbuffer, "buffering, elapsed %" GST_TIME_FORMAT
- " estimated_eos %" GST_TIME_FORMAT " left %" GST_TIME_FORMAT
- " delay %" GST_TIME_FORMAT,
- GST_TIME_ARGS (elapsed), GST_TIME_ARGS (priv->estimated_eos),
- GST_TIME_ARGS (left), GST_TIME_ARGS (delay));
- if (left > delay)
- goto do_wait;
- }
- /* if we have a packet, we can exit the loop and grab it */
- if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
- break;
- /* no packets but we are EOS, do eos logic */
- if (G_UNLIKELY (priv->eos))
- goto do_eos;
- /* underrun, wait for packets or flushing now if we are expecting an EOS
- * timeout, set the async timer for it too */
- if (priv->estimated_eos != -1 && !priv->reached_npt_stop) {
- sync_time = get_sync_time (jitterbuffer, priv->estimated_eos);
-
- GST_OBJECT_LOCK (jitterbuffer);
- clock = GST_ELEMENT_CLOCK (jitterbuffer);
- if (clock) {
- GST_INFO_OBJECT (jitterbuffer, "scheduling timeout");
- id = gst_clock_new_single_shot_id (clock, sync_time);
- gst_clock_id_wait_async (id, (GstClockCallback) eos_reached,
- jitterbuffer, NULL);
- }
- GST_OBJECT_UNLOCK (jitterbuffer);
- }
- }
- do_wait:
- /* now we wait */
- GST_DEBUG_OBJECT (jitterbuffer, "waiting");
- priv->waiting = TRUE;
- JBUF_WAIT (priv);
- priv->waiting = FALSE;
- GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
+ /* apply timestamp with offset to buffer now */
+ out_time = apply_offset (jitterbuffer, pts);
+ GST_BUFFER_PTS (outbuf) = out_time;
+ GST_BUFFER_DTS (outbuf) = out_time;
- if (id) {
- /* unschedule any pending async notifications we might have */
- gst_clock_id_unschedule (id);
- gst_clock_id_unref (id);
- }
- if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
- goto flushing;
+ /* update the elapsed time when we need to check against the npt stop time. */
+ update_estimated_eos (jitterbuffer, outbuf);
- if (id && priv->reached_npt_stop) {
- goto do_npt_stop;
- }
+ /* now we are ready to push the buffer. Save the seqnum and release the lock
+ * so the other end can push stuff in the queue again. */
+ priv->last_popped_seqnum = seqnum;
+ priv->last_out_time = out_time;
+ priv->last_out_pts = pts;
+ priv->next_seqnum = (seqnum + 1) & 0xffff;
+ JBUF_UNLOCK (priv);
+
+ if (percent != -1)
+ post_buffering_percent (jitterbuffer, percent);
+
+ /* push buffer */
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (out_time));
+ result = gst_pad_push (priv->srcpad, outbuf);
+
+ JBUF_LOCK_CHECK (priv, out_flushing);
+
+ return result;
+
+ /* ERRORS */
+out_flushing:
+ {
+ return priv->srcresult;
}
+}
+
+static GstClockTime
+estimate_pts (GstRtpJitterBuffer * jitterbuffer, GstClockTime pts, gint gap)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstClockTime duration;
- /* peek a buffer, we're just looking at the timestamp and the sequence number.
+ if (pts == -1 || priv->last_out_pts == -1)
+ return pts;
+
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_out_pts));
+
+ /* interpolate between the current time and the last time based on
+ * number of packets we are missing, this is the estimated duration
+ * for the missing packet based on equidistant packet spacing. Also make
+ * sure we never go negative. */
+ if (pts >= priv->last_out_pts)
+ duration = (pts - priv->last_out_pts) / (gap + 1);
+ else
+ /* packet already lost, timer will timeout quickly */
+ duration = 0;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (duration));
+
+ /* add this duration to the timestamp of the last packet we pushed */
+ pts = (priv->last_out_pts + duration);
+
+ return pts;
+}
+
+/* Peek a buffer and compare the seqnum to the expected seqnum.
+ * If all is fine, the buffer is pushed.
+ * If something is wrong, a timeout is set. We set 2 kinds of timeouts:
+ * * deadline: to the ultimate time we can still push the packet. We
+ * do this for the first packet to make sure we have the previous
+ * packets.
+ * * lost: the ultimate time we can receive a packet before we have
+ * to consider it lost. We estimate this based on the packet
+ * spacing.
+ */
+static GstFlowReturn
+handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstFlowReturn result = GST_FLOW_OK;
+ GstBuffer *outbuf;
+ guint16 seqnum;
+ GstClockTime pts;
+ guint32 next_seqnum;
+ gint gap;
+ GstRTPBuffer rtp = { NULL, };
+
+again:
+ /* peek a buffer, we're just looking at the sequence number.
* If all is fine, we'll pop and push it. If the sequence number is wrong we
* wait on the timestamp. In the chain function we will unlock the wait when a
* new buffer is available. The peeked buffer is valid for as long as we hold
* the jitterbuffer lock. */
outbuf = rtp_jitter_buffer_peek (priv->jbuf);
+ if (outbuf == NULL)
+ goto wait;
/* get the seqnum and the next expected seqnum */
gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
seqnum = gst_rtp_buffer_get_seq (&rtp);
gst_rtp_buffer_unmap (&rtp);
+
next_seqnum = priv->next_seqnum;
/* get the timestamp, this is already corrected for clock skew by the
* jitterbuffer */
- timestamp = GST_BUFFER_TIMESTAMP (outbuf);
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
- ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
- rtp_jitter_buffer_num_packets (priv->jbuf));
-
- /* apply our timestamp offset to the incomming buffer, this will be our output
- * timestamp. */
- out_time = apply_offset (jitterbuffer, timestamp);
+ pts = GST_BUFFER_PTS (outbuf);
/* get the gap between this and the previous packet. If we don't know the
* previous packet seqnum assume no gap. */
- if (G_LIKELY (next_seqnum != -1)) {
+ if (G_UNLIKELY (next_seqnum == -1)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
+ /* we don't know what the next_seqnum should be, wait for the last
+ * possible moment to push this buffer, maybe we get an earlier seqnum
+ * while we wait */
+ result = set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
+ } else {
+ /* else calculate GAP */
gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
- /* if we have a packet that we already pushed or considered dropped, pop it
- * off and get the next packet */
- if (G_UNLIKELY (gap < 0)) {
+ if (G_LIKELY (gap == 0)) {
+ /* no missing packet, pop and push */
+ result = pop_and_push_next (jitterbuffer, seqnum, pts);
+ } else if (G_UNLIKELY (gap < 0)) {
+ /* if we have a packet that we already pushed or considered dropped, pop it
+ * off and get the next packet */
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
- outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
gst_buffer_unref (outbuf);
goto again;
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Sequence number GAP detected: expected %d instead of %d (%d missing)",
+ next_seqnum, seqnum, gap);
+ /* packet missing, estimate when we should ultimately push this packet */
+ pts = estimate_pts (jitterbuffer, pts, gap);
+ /* and set a timer for it */
+ result = set_timer (jitterbuffer, TIMER_TYPE_LOST, next_seqnum, pts);
}
- } else {
- GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
- gap = -1;
}
+ return result;
- /* If we don't know what the next seqnum should be (== -1) we have to wait
- * because it might be possible that we are not receiving this buffer in-order,
- * a buffer with a lower seqnum could arrive later and we want to push that
- * earlier buffer before this buffer then.
- * If we know the expected seqnum, we can compare it to the current seqnum to
- * determine if we have missing a packet. If we have a missing packet (which
- * must be before this packet) we can wait for it until the deadline for this
- * packet expires. */
- if (G_UNLIKELY (gap != 0 && out_time != -1)) {
- GstClockReturn ret;
- GstClockTime duration = GST_CLOCK_TIME_NONE;
- GstClockTimeDiff clock_jitter;
- guint32 lost_packets = 1;
- gboolean lost_packets_late = FALSE;
+wait:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
+ return GST_FLOW_WAIT;
+ }
+}
- if (gap > 0) {
- /* we have a gap */
- GST_DEBUG_OBJECT (jitterbuffer,
- "Sequence number GAP detected: expected %d instead of %d (%d missing)",
- next_seqnum, seqnum, gap);
+/* a packet is lost */
+static GstFlowReturn
+do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTimeDiff clock_jitter)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstClockTime duration = GST_CLOCK_TIME_NONE;
+ guint32 lost_packets = 1;
+ gboolean lost_packets_late = FALSE;
- if (priv->last_out_time != -1) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
- GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
- /* interpolate between the current time and the last time based on
- * number of packets we are missing, this is the estimated duration
- * for the missing packet based on equidistant packet spacing. Also make
- * sure we never go negative. */
- if (out_time >= priv->last_out_time)
- duration = (out_time - priv->last_out_time) / (gap + 1);
- else
- goto lost;
+#if 0
+ if (clock_jitter > 0
+ && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
+ GstClockTimeDiff total_duration;
+ GstClockTime out_time_diff;
- GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
- GST_TIME_ARGS (duration));
- /* add this duration to the timestamp of the last packet we pushed */
- out_time = (priv->last_out_time + duration);
- }
- } else {
- /* we don't know what the next_seqnum should be, wait for the last
- * possible moment to push this buffer, maybe we get an earlier seqnum
- * while we wait */
- GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
- }
+ out_time_diff = apply_offset (jitterbuffer, timer->pts) - timer->timeout;
+ total_duration = MIN (out_time_diff, clock_jitter);
+
+ if (duration > 0)
+ lost_packets = total_duration / duration;
+ else
+ lost_packets = gap;
+ total_duration = lost_packets * duration;
+
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
+ ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (clock_jitter),
+ lost_packets, GST_TIME_ARGS (total_duration));
+
+ duration = total_duration;
+ lost_packets_late = TRUE;
+ }
+#endif
+
+ /* we had a gap and thus we lost some packets. Create an event for this. */
+ if (lost_packets > 1)
+ GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum,
+ timer->seqnum + lost_packets - 1);
+ else
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum);
+
+ priv->num_late += lost_packets;
+ priv->discont = TRUE;
+
+ /* update our expected next packet */
+ priv->last_popped_seqnum = timer->seqnum;
+ priv->last_out_time = timer->timeout;
+ priv->last_out_pts = timer->pts;
+ priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
+ /* remove timer now */
+ remove_timer (jitterbuffer, timer);
+
+ if (priv->do_lost) {
+ GstEvent *event;
+
+ /* create paket lost event */
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("GstRTPPacketLost",
+ "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum,
+ "timestamp", G_TYPE_UINT64, priv->last_out_time,
+ "duration", G_TYPE_UINT64, duration,
+ "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
+ JBUF_UNLOCK (priv);
+ gst_pad_push_event (priv->srcpad, event);
+ JBUF_LOCK_CHECK (priv, flushing);
+ }
+ return GST_FLOW_OK;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
+ return priv->srcresult;
+ }
+}
+
+static GstFlowReturn
+do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
+{
+ GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
+ remove_timer (jitterbuffer, timer);
+
+ return GST_FLOW_EOS;
+}
+
+/* called when we need to wait for the next timeout.
+ *
+ * We loop over the array of recorded timeouts and wait for the earliest one.
+ * When it timed out, do the logic associated with the timer.
+ *
+ * If there are no timers, we wait on a gcond until something new happens.
+ */
+static GstFlowReturn
+wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstFlowReturn result = GST_FLOW_OK;
+ gint i, len;
+ TimerData *timer = NULL;
+
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+
+ /* find the smallest timeout */
+ if (timer == NULL || test->timeout == -1 || test->timeout < timer->timeout)
+ timer = test;
+ }
+ if (timer) {
+ GstClock *clock;
+ GstClockTime sync_time;
+ GstClockID id;
+ GstClockReturn ret;
+ GstClockTimeDiff clock_jitter;
GST_OBJECT_LOCK (jitterbuffer);
clock = GST_ELEMENT_CLOCK (jitterbuffer);
if (!clock) {
GST_OBJECT_UNLOCK (jitterbuffer);
/* let's just push if there is no clock */
- GST_DEBUG_OBJECT (jitterbuffer, "No clock, push right away");
- goto push_buffer;
+ GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
+ goto do_timeout;
}
/* prepare for sync against clock */
- sync_time = get_sync_time (jitterbuffer, out_time);
+ sync_time = get_sync_time (jitterbuffer, timer->timeout);
GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
" with sync time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (out_time), GST_TIME_ARGS (sync_time));
+ GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (sync_time));
/* create an entry for the clock */
id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
ret = gst_clock_id_wait (id, &clock_jitter);
JBUF_LOCK (priv);
- GST_DEBUG_OBJECT (jitterbuffer, "sync done");
+ GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, %" G_GINT64_FORMAT,
+ ret, clock_jitter);
/* and free the entry */
gst_clock_id_unref (id);
priv->clock_id = NULL;
- if (ret == GST_CLOCK_EARLY && gap > 0
- && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
- GstClockTimeDiff total_duration;
- GstClockTime out_time_diff;
-
- out_time_diff = apply_offset (jitterbuffer, timestamp) - out_time;
- total_duration = MIN (out_time_diff, clock_jitter);
-
- if (duration > 0)
- lost_packets = total_duration / duration;
- else
- lost_packets = gap;
- total_duration = lost_packets * duration;
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
- ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
- GST_TIME_ARGS (clock_jitter),
- lost_packets, GST_TIME_ARGS (total_duration));
-
- duration = total_duration;
- lost_packets_late = TRUE;
- }
-
/* at this point, the clock could have been unlocked by a timeout, a new
* tail element was added to the queue or because we are shutting down. Check
* for shutdown first. */
* element became available in the queue or we flushed the queue.
* Grab it and try to push or sync. */
if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "Wait got unscheduled, will retry to push with new buffer");
- goto again;
+ GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
+ goto done;
}
-
- lost:
- /* we now timed out, this means we lost a packet or finished synchronizing
- * on the first buffer. */
- if (gap > 0) {
- GstEvent *event;
-
- /* we had a gap and thus we lost some packets. Create an event for this. */
- if (lost_packets > 1)
- GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", next_seqnum,
- next_seqnum + lost_packets - 1);
- else
- GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
-
- priv->num_late += lost_packets;
- discont = TRUE;
-
- /* update our expected next packet */
- priv->last_popped_seqnum = next_seqnum;
- priv->last_out_time += duration;
- priv->next_seqnum = (next_seqnum + lost_packets) & 0xffff;
-
- if (priv->do_lost) {
- /* create paket lost event */
- event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
- gst_structure_new ("GstRTPPacketLost",
- "seqnum", G_TYPE_UINT, (guint) next_seqnum,
- "timestamp", G_TYPE_UINT64, out_time,
- "duration", G_TYPE_UINT64, duration,
- "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
- JBUF_UNLOCK (priv);
- gst_pad_push_event (priv->srcpad, event);
- JBUF_LOCK_CHECK (priv, flushing);
- }
- /* look for next packet */
- goto again;
+ do_timeout:
+ switch (timer->type) {
+ case TIMER_TYPE_EXPECTED:
+ remove_timer (jitterbuffer, timer);
+ break;
+ case TIMER_TYPE_LOST:
+ result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
+ break;
+ case TIMER_TYPE_DEADLINE:
+ priv->next_seqnum = timer->seqnum;
+ remove_timer (jitterbuffer, timer);
+ break;
+ case TIMER_TYPE_EOS:
+ result = do_eos_timeout (jitterbuffer, timer);
+ break;
}
-
- /* there was no known gap,just the first packet, exit the loop and push */
- GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
-
- /* get new timestamp, latency might have changed */
- out_time = apply_offset (jitterbuffer, timestamp);
+ } else {
+ /* no timers, wait for activity */
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting");
+ priv->waiting = TRUE;
+ JBUF_WAIT_CHECK (priv, flushing);
+ priv->waiting = FALSE;
+ GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
}
-push_buffer:
-
- /* when we get here we are ready to pop and push the buffer */
- outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
- check_buffering_percent (jitterbuffer, &percent);
+done:
+ return result;
- if (G_UNLIKELY (discont || priv->discont)) {
- /* set DISCONT flag when we missed a packet. We pushed the buffer writable
- * into the jitterbuffer so we can modify now. */
- GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
- priv->discont = FALSE;
- }
- if (G_UNLIKELY (priv->ts_discont)) {
- GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
- priv->ts_discont = FALSE;
+flushing:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
+ return priv->srcresult;
}
+}
- /* apply timestamp with offset to buffer now */
- GST_BUFFER_PTS (outbuf) = out_time;
- GST_BUFFER_DTS (outbuf) = out_time;
-
- /* update the elapsed time when we need to check against the npt stop time. */
- update_estimated_eos (jitterbuffer, outbuf);
+/*
+ * This funcion implements the main pushing loop on the source pad.
+ *
+ * It first tries to push as many buffers as possible. If there is a seqnum
+ * mismatch, a timeout is created and this function goes waiting for the
+ * next timeout.
+ */
+static void
+gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv;
+ GstFlowReturn result;
- /* now we are ready to push the buffer. Save the seqnum and release the lock
- * so the other end can push stuff in the queue again. */
- priv->last_popped_seqnum = seqnum;
- priv->last_out_time = out_time;
- priv->next_seqnum = (seqnum + 1) & 0xffff;
- JBUF_UNLOCK (priv);
+ priv = jitterbuffer->priv;
- if (percent != -1)
- post_buffering_percent (jitterbuffer, percent);
+ JBUF_LOCK_CHECK (priv, flushing);
+ while (TRUE) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
- /* push buffer */
- GST_DEBUG_OBJECT (jitterbuffer,
- "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
- GST_TIME_ARGS (out_time));
- result = gst_pad_push (priv->srcpad, outbuf);
- if (G_UNLIKELY (result != GST_FLOW_OK))
- goto pause;
+ result = handle_next_buffer (jitterbuffer);
+ if (result == GST_FLOW_WAIT) {
+ /* now wait for the next event */
+ result = wait_next_timeout (jitterbuffer);
+ }
+ if (result != GST_FLOW_OK)
+ break;
+ }
+ JBUF_UNLOCK (priv);
- return;
+ /* if we get here we need to pause */
+ goto pause;
/* ERRORS */
-do_eos:
- {
- /* store result, we are flushing now */
- GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
- priv->srcresult = GST_FLOW_EOS;
- gst_pad_pause_task (priv->srcpad);
- JBUF_UNLOCK (priv);
- gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
- return;
- }
-do_npt_stop:
- {
- /* store result, we are flushing now */
- GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop");
- JBUF_UNLOCK (priv);
-
- g_signal_emit (jitterbuffer,
- gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL);
- return;
- }
flushing:
{
- GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- gst_pad_pause_task (priv->srcpad);
+ result = priv->srcresult;
JBUF_UNLOCK (priv);
- return;
+ goto pause;
}
pause:
{
- GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
- gst_flow_get_name (result));
+ const gchar *reason = gst_flow_get_name (result);
+ GstEvent *event;
- JBUF_LOCK (priv);
- /* store result */
- priv->srcresult = result;
- /* we don't post errors or anything because upstream will do that for us
- * when we pass the return value upstream. */
+ GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
gst_pad_pause_task (priv->srcpad);
- JBUF_UNLOCK (priv);
+ if (result == GST_FLOW_EOS) {
+ event = gst_event_new_eos ();
+ gst_pad_push_event (priv->srcpad, event);
+ }
return;
}
}
-/* collect the info form the lastest RTCP packet and the jittebuffer sync, do
+/* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
* some sanity checks and then emit the handle-sync signal with the parameters.
* This function must be called with the LOCK */
static void