/**
* SECTION:element-gstrtpjitterbuffer
- * @short_description: buffer, reorder and remove duplicate RTP packets to
- * compensate for network oddities.
*
- * <refsect2>
- * <para>
* This element reorders and removes duplicate RTP packets as they are received
* from a network source. It will also wait for missing packets up to a
- * configurable time limit using the ::latency property. Packets arriving too
- * late are considered to be lost packets.
- * </para>
- * <para>
- * This element acts as a live element and so adds ::latency to the pipeline.
- * </para>
- * <para>
+ * configurable time limit using the #GstRtpJitterBuffer:latency property.
+ * Packets arriving too late are considered to be lost packets.
+ *
+ * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
+ * to the pipeline.
+ *
* The element needs the clock-rate of the RTP payload in order to estimate the
* delay. This information is obtained either from the caps on the sink pad or,
- * when no caps are present, from the ::request-pt-map signal. To clear the
- * previous pt-map use the ::clear-pt-map signal.
- * </para>
- * <para>
+ * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
+ * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
+ *
* This element will automatically be used inside gstrtpbin.
- * </para>
+ *
+ * <refsect2>
* <title>Example pipelines</title>
- * <para>
- * <programlisting>
+ * |[
* gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
- * </programlisting>
- * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
+ * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
* inserted into the pipeline to smooth out network jitter and to reorder the
* out-of-order RTP packets.
- * </para>
* </refsect2>
*
* Last reviewed on 2007-05-28 (0.10.5)
#include "gstrtpjitterbuffer.h"
#include "rtpjitterbuffer.h"
+#include "rtpstats.h"
GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
#define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
#define DEFAULT_LATENCY_MS 200
#define DEFAULT_DROP_ON_LATENCY FALSE
#define DEFAULT_TS_OFFSET 0
+#define DEFAULT_DO_LOST FALSE
enum
{
PROP_0,
PROP_LATENCY,
PROP_DROP_ON_LATENCY,
- PROP_TS_OFFSET
+ PROP_TS_OFFSET,
+ PROP_DO_LOST,
+ PROP_LAST
};
#define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock))
#define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
JBUF_LOCK (priv); \
- if (priv->srcresult != GST_FLOW_OK) \
+ if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
JBUF_WAIT(priv); \
- if (priv->srcresult != GST_FLOW_OK) \
+ if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
RTPJitterBuffer *jbuf;
GMutex *jbuf_lock;
GCond *jbuf_cond;
+ gboolean waiting;
+ gboolean discont;
/* properties */
guint latency_ms;
gboolean drop_on_latency;
gint64 ts_offset;
+ gboolean do_lost;
/* the last seqnum we pushed out */
guint32 last_popped_seqnum;
/* the next expected seqnum */
guint32 next_seqnum;
+ /* last output time */
+ GstClockTime last_out_time;
/* state */
gboolean eos;
/* clock rate and rtp timestamp offset */
+ gint last_pt;
gint32 clock_rate;
gint64 clock_base;
- guint64 exttimestamp;
gint64 prev_ts_offset;
/* when we are shutting down */
/* for sync */
GstSegment segment;
GstClockID clock_id;
- guint32 waiting_seqnum;
/* the latency of the upstream peer, we have to take this into account when
* synchronizing the buffers. */
GstClockTime peer_latency;
guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_rtp_jitter_buffer_get_property (GObject * object,
guint prop_id, GValue * value, GParamSpec * pspec);
-static void gst_rtp_jitter_buffer_dispose (GObject * object);
+static void gst_rtp_jitter_buffer_finalize (GObject * object);
/* element overrides */
static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
/* sinkpad overrides */
static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
+static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
+ GstEvent * event);
static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
GstEvent * event);
static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
- gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_dispose);
+ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_finalize);
gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
/**
* GstRtpJitterBuffer::ts-offset:
*
- * Adjust RTP timestamps in the jitterbuffer with offset.
+ * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
+ * This is mainly used to ensure interstream synchronisation.
*/
g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
- g_param_spec_int64 ("ts-offset",
- "Timestamp Offset",
- "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64,
- G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE));
+ g_param_spec_int64 ("ts-offset", "Timestamp Offset",
+ "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
+ G_MAXINT64, DEFAULT_TS_OFFSET,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstRtpJitterBuffer::do-lost:
+ *
+ * Send out a GstRTPPacketLost event downstream when a packet is considered
+ * lost.
+ */
+ g_object_class_install_property (gobject_class, PROP_DO_LOST,
+ g_param_spec_boolean ("do-lost", "Do Lost",
+ "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtpJitterBuffer::request-pt-map:
* @buffer: the object which received the signal
* GstRtpJitterBuffer::clear-pt-map:
* @buffer: the object which received the signal
*
- * Invalidate the clock-rate as obtained with the ::request-pt-map signal.
+ * Invalidate the clock-rate as obtained with the
+ * #GstRtpJitterBuffer::request-pt-map signal.
*/
gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
GST_DEBUG_CATEGORY_INIT
- (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+ (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
}
static void
priv->latency_ms = DEFAULT_LATENCY_MS;
priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
+ priv->do_lost = DEFAULT_DO_LOST;
priv->jbuf = rtp_jitter_buffer_new ();
priv->jbuf_lock = g_mutex_new ();
priv->jbuf_cond = g_cond_new ();
- priv->waiting_seqnum = -1;
-
priv->srcpad =
gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
"src");
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
gst_pad_set_getcaps_function (priv->srcpad,
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
+ gst_pad_set_event_function (priv->srcpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
priv->sinkpad =
gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
}
static void
-gst_rtp_jitter_buffer_dispose (GObject * object)
+gst_rtp_jitter_buffer_finalize (GObject * object)
{
GstRtpJitterBuffer *jitterbuffer;
jitterbuffer = GST_RTP_JITTER_BUFFER (object);
- if (jitterbuffer->priv->jbuf) {
- g_object_unref (jitterbuffer->priv->jbuf);
- jitterbuffer->priv->jbuf = NULL;
- }
- G_OBJECT_CLASS (parent_class)->dispose (object);
+ g_mutex_free (jitterbuffer->priv->jbuf_lock);
+ g_cond_free (jitterbuffer->priv->jbuf_cond);
+
+ g_object_unref (jitterbuffer->priv->jbuf);
+
+ G_OBJECT_CLASS (parent_class)->finalize (object);
}
static void
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
/* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL (priv);
- rtp_jitter_buffer_flush (priv->jbuf);
/* unlock clock, we just unschedule, the entry will be released by the
* locking streaming thread. */
if (priv->clock_id)
priv->srcresult = GST_FLOW_OK;
gst_segment_init (&priv->segment, GST_FORMAT_TIME);
priv->last_popped_seqnum = -1;
+ priv->last_out_time = -1;
priv->next_seqnum = -1;
priv->clock_rate = -1;
priv->eos = FALSE;
- priv->exttimestamp = -1;
+ rtp_jitter_buffer_flush (priv->jbuf);
+ rtp_jitter_buffer_reset_skew (priv->jbuf);
JBUF_UNLOCK (priv);
}
priv->clock_rate = -1;
priv->clock_base = -1;
priv->peer_latency = 0;
+ priv->last_pt = -1;
/* block until we go to PLAYING */
priv->blocked = TRUE;
- priv->exttimestamp = -1;
+ /* reset skew detection initialy */
+ rtp_jitter_buffer_reset_skew (priv->jbuf);
JBUF_UNLOCK (priv);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
return ret;
}
-/**
- * Performs comparison 'b - a' with check for overflows.
- */
-static inline gint
-priv_compare_rtp_seq_lt (guint16 a, guint16 b)
+static gboolean
+gst_rtp_jitter_buffer_src_event (GstPad * pad, GstEvent * event)
{
- /* check if diff more than half of the 16bit range */
- if (abs (b - a) > (1 << 15)) {
- /* one of a/b has wrapped */
- return a - b;
- } else {
- return b - a;
+ gboolean ret = TRUE;
+ GstRtpJitterBuffer *jitterbuffer;
+ GstRtpJitterBufferPrivate *priv;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+ priv = jitterbuffer->priv;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
+
+ switch (GST_EVENT_TYPE (event)) {
+ default:
+ ret = gst_pad_push_event (priv->sinkpad, event);
+ break;
}
+ gst_object_unref (jitterbuffer);
+
+ return ret;
}
static gboolean
g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
&ret);
- caps = (GstCaps *) g_value_get_boxed (&ret);
+ g_value_unset (&args[0]);
+ g_value_unset (&args[1]);
+ caps = (GstCaps *) g_value_dup_boxed (&ret);
+ g_value_unset (&ret);
if (!caps)
goto no_caps;
res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+ gst_caps_unref (caps);
+
return res;
/* ERRORS */
GstRtpJitterBufferPrivate *priv;
guint16 seqnum;
GstFlowReturn ret = GST_FLOW_OK;
+ GstClockTime timestamp;
+ guint64 latency_ts;
+ gboolean tail;
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
- if (!gst_rtp_buffer_validate (buffer))
+ if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
goto invalid_buffer;
priv = jitterbuffer->priv;
- if (priv->clock_rate == -1) {
+ if (G_UNLIKELY (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer))) {
+ GstCaps *caps;
+
+ priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
+ /* reset clock-rate so that we get a new one */
+ priv->clock_rate = -1;
+ /* Try to get the clock-rate from the caps first if we can. If there are no
+ * caps we must fire the signal to get the clock-rate. */
+ if ((caps = GST_BUFFER_CAPS (buffer))) {
+ gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+ }
+ }
+
+ if (G_UNLIKELY (priv->clock_rate == -1)) {
guint8 pt;
/* no clock rate given on the caps, try to get one with the signal */
pt = gst_rtp_buffer_get_payload_type (buffer);
gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
- if (priv->clock_rate == -1)
+ if (G_UNLIKELY (priv->clock_rate == -1))
goto not_negotiated;
}
+ /* take the timestamp of the buffer. This is the time when the packet was
+ * received and is used to calculate jitter and clock skew. We will adjust
+ * this timestamp with the smoothed value after processing it in the
+ * jitterbuffer. */
+ timestamp = GST_BUFFER_TIMESTAMP (buffer);
+ /* bring to running time */
+ timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
+ timestamp);
+
seqnum = gst_rtp_buffer_get_seq (buffer);
- GST_DEBUG_OBJECT (jitterbuffer, "Received packet #%d", seqnum);
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (timestamp));
JBUF_LOCK_CHECK (priv, out_flushing);
/* don't accept more data on EOS */
- if (priv->eos)
+ if (G_UNLIKELY (priv->eos))
goto have_eos;
- /* let's check if this buffer is too late, we cannot accept packets with
- * bigger seqnum than the one we already pushed. */
- if (priv->last_popped_seqnum != -1) {
- if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
- goto too_late;
+ /* let's check if this buffer is too late, we can only accept packets with
+ * bigger seqnum than the one we last pushed. */
+ if (G_LIKELY (priv->last_popped_seqnum != -1)) {
+ gint gap;
+ gboolean reset = FALSE;
+
+ gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
+
+ if (G_UNLIKELY (gap <= 0)) {
+ /* priv->last_popped_seqnum >= seqnum, this packet is too late or the
+ * sender might have been restarted with different seqnum. */
+ if (gap < -RTP_MAX_MISORDER) {
+ GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
+ reset = TRUE;
+ } else {
+ goto too_late;
+ }
+ } else {
+ /* priv->last_popped_seqnum < seqnum, this is a new packet */
+ if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
+ gap);
+ reset = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "dropped packets %d but <= %d", gap,
+ RTP_MAX_DROPOUT);
+ }
+ }
+ if (G_UNLIKELY (reset)) {
+ priv->last_popped_seqnum = -1;
+ priv->next_seqnum = -1;
+ rtp_jitter_buffer_reset_skew (priv->jbuf);
+ }
}
/* let's drop oldest packet if the queue is already full and drop-on-latency
* latency is set, we just pump it in the queue and let the other end push it
* out as fast as possible. */
if (priv->latency_ms && priv->drop_on_latency) {
- guint64 latency_ts;
latency_ts =
gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
- if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
+ if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
GstBuffer *old_buf;
+ old_buf = rtp_jitter_buffer_pop (priv->jbuf);
+
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
- seqnum);
+ gst_rtp_buffer_get_seq (old_buf));
- old_buf = rtp_jitter_buffer_pop (priv->jbuf);
gst_buffer_unref (old_buf);
}
}
+ /* we need to make the metadata writable before pushing it in the jitterbuffer
+ * because the jitterbuffer will update the timestamp */
+ buffer = gst_buffer_make_metadata_writable (buffer);
+
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
- if (!rtp_jitter_buffer_insert (priv->jbuf, buffer))
+ if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
+ priv->clock_rate, &tail)))
goto duplicate;
- /* signal addition of new buffer */
- JBUF_SIGNAL (priv);
+ /* signal addition of new buffer when the _loop is waiting. */
+ if (priv->waiting)
+ JBUF_SIGNAL (priv);
/* let's unschedule and unblock any waiting buffers. We only want to do this
- * if there is a currently waiting newer (> seqnum) buffer */
- if (priv->clock_id) {
- if (priv->waiting_seqnum > seqnum) {
- gst_clock_id_unschedule (priv->clock_id);
- GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
- }
+ * when the tail buffer changed */
+ if (G_UNLIKELY (priv->clock_id && tail)) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Unscheduling waiting buffer, new tail buffer");
+ gst_clock_id_unschedule (priv->clock_id);
}
GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
/* ERRORS */
invalid_buffer:
{
- /* this is fatal and should be filtered earlier */
- GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
- ("Received invalid RTP payload"));
+ /* this is not fatal but should be filtered earlier */
+ GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
+ ("Received invalid RTP payload, dropping"));
gst_buffer_unref (buffer);
gst_object_unref (jitterbuffer);
- return GST_FLOW_ERROR;
+ return GST_FLOW_OK;
}
not_negotiated:
{
GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
gst_buffer_unref (buffer);
gst_object_unref (jitterbuffer);
- return GST_FLOW_NOT_NEGOTIATED;
+ return GST_FLOW_OK;
}
out_flushing:
{
}
}
+static GstClockTime
+apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
+{
+ GstRtpJitterBufferPrivate *priv;
+
+ priv = jitterbuffer->priv;
+
+ if (timestamp == -1)
+ return -1;
+
+ /* apply the timestamp offset */
+ timestamp += priv->ts_offset;
+
+ return timestamp;
+}
+
/**
* This funcion will push out buffers on the source pad.
*
* For each pushed buffer, the seqnum is recorded, if the next buffer B has a
* different seqnum (missing packets before B), this function will wait for the
- * missing packet to arrive up to the rtp timestamp of buffer B.
+ * missing packet to arrive up to the timestamp of buffer B.
*/
static void
gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv;
- GstBuffer *outbuf = NULL;
+ GstBuffer *outbuf;
GstFlowReturn result;
guint16 seqnum;
- guint32 rtp_time;
- GstClockTime timestamp;
- gint64 running_time;
- guint64 exttimestamp;
- gint ts_offset_rtp;
+ guint32 next_seqnum;
+ GstClockTime timestamp, out_time;
+ gboolean discont = FALSE;
+ gint gap;
priv = jitterbuffer->priv;
JBUF_LOCK_CHECK (priv, flushing);
again:
- GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
+ GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
while (TRUE) {
-
/* always wait if we are blocked */
- if (!priv->blocked) {
- /* if we have a packet, we can grab it */
+ if (G_LIKELY (!priv->blocked)) {
+ /* if we have a packet, we can exit the loop and grab it */
if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
break;
/* no packets but we are EOS, do eos logic */
- if (priv->eos)
+ if (G_UNLIKELY (priv->eos))
goto do_eos;
}
- /* wait for packets or flushing now */
+ /* underrun, wait for packets or flushing now */
+ priv->waiting = TRUE;
JBUF_WAIT_CHECK (priv, flushing);
+ priv->waiting = FALSE;
}
- /* pop a buffer, we must have a buffer now */
- outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+ /* peek a buffer, we're just looking at the timestamp and the sequence number.
+ * If all is fine, we'll pop and push it. If the sequence number is wrong we
+ * wait on the timestamp. In the chain function we will unlock the wait when a
+ * new buffer is available. The peeked buffer is valid for as long as we hold
+ * the jitterbuffer lock. */
+ outbuf = rtp_jitter_buffer_peek (priv->jbuf);
+ /* get the seqnum and the next expected seqnum */
seqnum = gst_rtp_buffer_get_seq (outbuf);
+ next_seqnum = priv->next_seqnum;
- /* get the max deadline to wait for the missing packets, this is the time
- * of the currently popped packet */
- rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
- exttimestamp = gst_rtp_buffer_ext_timestamp (&priv->exttimestamp, rtp_time);
+ /* get the timestamp, this is already corrected for clock skew by the
+ * jitterbuffer */
+ timestamp = GST_BUFFER_TIMESTAMP (outbuf);
GST_DEBUG_OBJECT (jitterbuffer,
- "Popped buffer #%d, rtptime %u, exttime %" G_GUINT64_FORMAT
- ", now %d left", seqnum, rtp_time, exttimestamp,
+ "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
+ ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
rtp_jitter_buffer_num_packets (priv->jbuf));
+ /* apply our timestamp offset to the incomming buffer, this will be our output
+ * timestamp. */
+ out_time = apply_offset (jitterbuffer, timestamp);
+
+ /* get the gap between this and the previous packet. If we don't know the
+ * previous packet seqnum assume no gap. */
+ if (G_LIKELY (next_seqnum != -1)) {
+ gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
+
+ /* if we have a packet that we already pushed or considered dropped, pop it
+ * off and get the next packet */
+ if (G_UNLIKELY (gap < 0)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
+ seqnum, next_seqnum);
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+ gst_buffer_unref (outbuf);
+ goto again;
+ }
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
+ gap = -1;
+ }
+
/* If we don't know what the next seqnum should be (== -1) we have to wait
* because it might be possible that we are not receiving this buffer in-order,
* a buffer with a lower seqnum could arrive later and we want to push that
* determine if we have missing a packet. If we have a missing packet (which
* must be before this packet) we can wait for it until the deadline for this
* packet expires. */
- if (priv->next_seqnum == -1 || priv->next_seqnum != seqnum) {
+ if (G_UNLIKELY (gap != 0 && out_time != -1)) {
GstClockID id;
- GstClockTimeDiff jitter;
+ GstClockTime sync_time;
GstClockReturn ret;
GstClock *clock;
+ GstClockTime duration = GST_CLOCK_TIME_NONE;
- if (priv->next_seqnum != -1) {
- /* we expected next_seqnum but received something else, that's a gap */
+ if (gap > 0) {
+ /* we have a gap */
GST_WARNING_OBJECT (jitterbuffer,
- "Sequence number GAP detected -> %d instead of %d", priv->next_seqnum,
- seqnum);
+ "Sequence number GAP detected: expected %d instead of %d (%d missing)",
+ next_seqnum, seqnum, gap);
+
+ if (priv->last_out_time != -1) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
+ /* interpolate between the current time and the last time based on
+ * number of packets we are missing, this is the estimated duration
+ * for the missing packet based on equidistant packet spacing. Also make
+ * sure we never go negative. */
+ if (out_time > priv->last_out_time)
+ duration = (out_time - priv->last_out_time) / (gap + 1);
+ else
+ goto lost;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (duration));
+ /* add this duration to the timestamp of the last packet we pushed */
+ out_time = (priv->last_out_time + duration);
+ }
} else {
/* we don't know what the next_seqnum should be, wait for the last
* possible moment to push this buffer, maybe we get an earlier seqnum
GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
}
- GST_DEBUG_OBJECT (jitterbuffer,
- "exttimestamp %" G_GUINT64_FORMAT ", base %" G_GINT64_FORMAT,
- exttimestamp, priv->clock_base);
-
- /* if no clock_base was given, take first ts as base */
- if (priv->clock_base == -1) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp);
- priv->clock_base = exttimestamp;
- }
-
- /* take rtp timestamp offset into account, this can wrap around */
- exttimestamp -= priv->clock_base;
-
- /* bring timestamp to gst time */
- timestamp =
- gst_util_uint64_scale_int (exttimestamp, GST_SECOND, priv->clock_rate);
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "exttimestamp %" G_GUINT64_FORMAT ", clock-rate %u, timestamp %"
- GST_TIME_FORMAT, exttimestamp, priv->clock_rate,
- GST_TIME_ARGS (timestamp));
-
- /* bring to running time */
- running_time = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
- timestamp);
-
GST_OBJECT_LOCK (jitterbuffer);
clock = GST_ELEMENT_CLOCK (jitterbuffer);
if (!clock) {
goto push_buffer;
}
- /* add latency, this includes our own latency and the peer latency. */
- running_time += (priv->latency_ms * GST_MSECOND);
- running_time += priv->peer_latency;
-
- GST_DEBUG_OBJECT (jitterbuffer, "sync to running_time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (running_time));
+ GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (out_time));
/* prepare for sync against clock */
- running_time += GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ /* add latency, this includes our own latency and the peer latency. */
+ sync_time += (priv->latency_ms * GST_MSECOND);
+ sync_time += priv->peer_latency;
/* create an entry for the clock */
- id = priv->clock_id = gst_clock_new_single_shot_id (clock, running_time);
- priv->waiting_seqnum = seqnum;
+ id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
GST_OBJECT_UNLOCK (jitterbuffer);
/* release the lock so that the other end can push stuff or unlock */
JBUF_UNLOCK (priv);
- ret = gst_clock_id_wait (id, &jitter);
+ ret = gst_clock_id_wait (id, NULL);
JBUF_LOCK (priv);
/* and free the entry */
gst_clock_id_unref (id);
priv->clock_id = NULL;
- priv->waiting_seqnum = -1;
/* at this point, the clock could have been unlocked by a timeout, a new
* tail element was added to the queue or because we are shutting down. Check
* for shutdown first. */
- if (priv->srcresult != GST_FLOW_OK)
- goto flushing;
+ if G_UNLIKELY
+ ((priv->srcresult != GST_FLOW_OK))
+ goto flushing;
/* if we got unscheduled and we are not flushing, it's because a new tail
* element became available in the queue. Grab it and try to push or sync. */
if (ret == GST_CLOCK_UNSCHEDULED) {
GST_DEBUG_OBJECT (jitterbuffer,
"Wait got unscheduled, will retry to push with new buffer");
- /* reinsert popped buffer into queue */
- if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf)) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "Duplicate packet #%d detected, dropping", seqnum);
- priv->num_duplicates++;
- gst_buffer_unref (outbuf);
- }
goto again;
}
- }
-push_buffer:
- /* check if we are pushing something unexpected */
- if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
- gint dropped;
- /* calc number of missing packets, careful for wraparounds */
- dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
+ lost:
+ /* we now timed out, this means we lost a packet or finished synchronizing
+ * on the first buffer. */
+ if (gap > 0) {
+ GstEvent *event;
+
+ /* we had a gap and thus we lost a packet. Create an event for this. */
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
+ priv->num_late++;
+ discont = TRUE;
+
+ if (priv->do_lost) {
+ /* create paket lost event */
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("GstRTPPacketLost",
+ "seqnum", G_TYPE_UINT, (guint) next_seqnum,
+ "timestamp", G_TYPE_UINT64, out_time,
+ "duration", G_TYPE_UINT64, duration, NULL));
+ gst_pad_push_event (priv->srcpad, event);
+ }
- GST_DEBUG_OBJECT (jitterbuffer,
- "Pushing DISCONT after dropping %d (%d to %d)", dropped,
- priv->next_seqnum, seqnum);
+ /* update our expected next packet */
+ priv->last_popped_seqnum = next_seqnum;
+ priv->last_out_time = out_time;
+ priv->next_seqnum = (next_seqnum + 1) & 0xffff;
+ /* look for next packet */
+ goto again;
+ }
- /* update stats */
- priv->num_late += dropped;
+ /* there was no known gap,just the first packet, exit the loop and push */
+ GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
- /* set DISCONT flag */
- outbuf = gst_buffer_make_metadata_writable (outbuf);
- GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ /* get new timestamp, latency might have changed */
+ out_time = apply_offset (jitterbuffer, timestamp);
}
+push_buffer:
- /* apply the timestamp offset */
- if (priv->ts_offset > 0)
- ts_offset_rtp =
- gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate,
- GST_SECOND);
- else if (priv->ts_offset < 0)
- ts_offset_rtp =
- -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate,
- GST_SECOND);
- else
- ts_offset_rtp = 0;
-
- if (ts_offset_rtp != 0) {
- guint32 timestamp;
-
- /* if the offset changed, mark with discont */
- if (priv->ts_offset != priv->prev_ts_offset) {
- GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp);
- GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
- priv->prev_ts_offset = priv->ts_offset;
- }
+ /* when we get here we are ready to pop and push the buffer */
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf);
- timestamp = gst_rtp_buffer_get_timestamp (outbuf);
- timestamp += ts_offset_rtp;
- gst_rtp_buffer_set_timestamp (outbuf, timestamp);
+ if (G_UNLIKELY (discont || priv->discont)) {
+ /* set DISCONT flag when we missed a packet. We pushed the buffer writable
+ * into the jitterbuffer so we can modify now. */
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ priv->discont = FALSE;
}
+ /* apply timestamp with offset to buffer now */
+ GST_BUFFER_TIMESTAMP (outbuf) = out_time;
+
/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
priv->last_popped_seqnum = seqnum;
+ priv->last_out_time = out_time;
priv->next_seqnum = (seqnum + 1) & 0xffff;
JBUF_UNLOCK (priv);
/* push buffer */
- GST_DEBUG_OBJECT (jitterbuffer, "Pushing buffer %d", seqnum);
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
+ GST_TIME_ARGS (out_time));
result = gst_pad_push (priv->srcpad, outbuf);
- if (result != GST_FLOW_OK)
+ if (G_UNLIKELY (result != GST_FLOW_OK))
goto pause;
return;
{
GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
gst_pad_pause_task (priv->srcpad);
- if (outbuf)
- gst_buffer_unref (outbuf);
JBUF_UNLOCK (priv);
return;
}
* own */
GstClockTime min_latency, max_latency;
gboolean us_live;
- GstPad *peer;
GstClockTime our_latency;
- if ((peer = gst_pad_get_peer (priv->sinkpad))) {
- if ((res = gst_pad_query (peer, query))) {
- gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
-
- GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
- GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
- GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+ if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
+ gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
- /* store this so that we can safely sync on the peer buffers. */
- JBUF_LOCK (priv);
- priv->peer_latency = min_latency;
- JBUF_UNLOCK (priv);
+ GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
+ GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
- our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
+ /* store this so that we can safely sync on the peer buffers. */
+ JBUF_LOCK (priv);
+ priv->peer_latency = min_latency;
+ our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
+ JBUF_UNLOCK (priv);
- GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
- GST_TIME_ARGS (our_latency));
+ GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (our_latency));
- min_latency += our_latency;
- /* max_latency can be -1, meaning there is no upper limit for the
- * latency. */
- if (max_latency != -1)
- max_latency += our_latency * GST_MSECOND;
+ /* we add some latency but can buffer an infinite amount of time */
+ min_latency += our_latency;
+ max_latency = -1;
- GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
- GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
- GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
+ GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
+ GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
- gst_query_set_latency (query, TRUE, min_latency, max_latency);
- }
- gst_object_unref (peer);
+ gst_query_set_latency (query, TRUE, min_latency, max_latency);
}
break;
}
res = gst_pad_query_default (pad, query);
break;
}
+
+ gst_object_unref (jitterbuffer);
+
return res;
}
{
guint new_latency, old_latency;
- /* FIXME, not threadsafe */
new_latency = g_value_get_uint (value);
- old_latency = priv->latency_ms;
+ JBUF_LOCK (priv);
+ old_latency = priv->latency_ms;
priv->latency_ms = new_latency;
+ JBUF_UNLOCK (priv);
/* post message if latency changed, this will inform the parent pipeline
* that a latency reconfiguration is possible/needed. */
break;
}
case PROP_DROP_ON_LATENCY:
+ JBUF_LOCK (priv);
priv->drop_on_latency = g_value_get_boolean (value);
+ JBUF_UNLOCK (priv);
break;
case PROP_TS_OFFSET:
JBUF_LOCK (priv);
priv->ts_offset = g_value_get_int64 (value);
+ /* FIXME, we don't really have a method for signaling a timestamp
+ * DISCONT without also making this a data discont. */
+ /* priv->discont = TRUE; */
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_DO_LOST:
+ JBUF_LOCK (priv);
+ priv->do_lost = g_value_get_boolean (value);
JBUF_UNLOCK (priv);
break;
default:
switch (prop_id) {
case PROP_LATENCY:
+ JBUF_LOCK (priv);
g_value_set_uint (value, priv->latency_ms);
+ JBUF_UNLOCK (priv);
break;
case PROP_DROP_ON_LATENCY:
+ JBUF_LOCK (priv);
g_value_set_boolean (value, priv->drop_on_latency);
+ JBUF_UNLOCK (priv);
break;
case PROP_TS_OFFSET:
JBUF_LOCK (priv);
g_value_set_int64 (value, priv->ts_offset);
JBUF_UNLOCK (priv);
break;
+ case PROP_DO_LOST:
+ JBUF_LOCK (priv);
+ g_value_set_boolean (value, priv->do_lost);
+ JBUF_UNLOCK (priv);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
+
+void
+gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime,
+ guint64 * timestamp)
+{
+ GstRtpJitterBufferPrivate *priv;
+
+ g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer));
+
+ priv = buffer->priv;
+
+ JBUF_LOCK (priv);
+ rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp);
+ JBUF_UNLOCK (priv);
+}