* Copyright 2007 Nokia Corporation
* @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
* Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
+ * Copyright 2015 Kurento (http://kurento.org/)
+ * @author: Miguel ParĂs <mparisdiaz@gmail.com>
+ * Copyright 2016 Pexip AS
+ * @author: Havard Graff <havard@pexip.com>
+ * @author: Stian Selnes <stian@pexip.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* depayloader or other element to create concealment data or some other logic
* to gracefully handle the missing packets.
*
- * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
+ * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming
* buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
* buffer.
*
*
* - If seqnum N arrived, all seqnum older than
* N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
- * immediately. This is to request fast feedback for abonormally reorder
+ * immediately. This is to request fast feedback for abnormally reorder
* packets before any of the previous timeouts is triggered.
*
* A late packet triggers the GstRTPRetransmissionRequest custom upstream
#endif
#include <stdlib.h>
+#include <stdio.h>
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
+#include <gst/net/net.h>
#include "gstrtpjitterbuffer.h"
#include "rtpjitterbuffer.h"
#define DEFAULT_LATENCY_MS 200
#define DEFAULT_DROP_ON_LATENCY FALSE
#define DEFAULT_TS_OFFSET 0
+#define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
#define DEFAULT_DO_LOST FALSE
#define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
#define DEFAULT_PERCENT 0
#define DEFAULT_RTX_MIN_RETRY_TIMEOUT -1
#define DEFAULT_RTX_RETRY_PERIOD -1
#define DEFAULT_RTX_MAX_RETRIES -1
+#define DEFAULT_RTX_DEADLINE -1
+#define DEFAULT_RTX_STATS_TIMEOUT 1000
+#define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
+#define DEFAULT_MAX_DROPOUT_TIME 60000
+#define DEFAULT_MAX_MISORDER_TIME 2000
+#define DEFAULT_RFC7273_SYNC FALSE
+#define DEFAULT_FASTSTART_MIN_PACKETS 0
#define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
#define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
PROP_LATENCY,
PROP_DROP_ON_LATENCY,
PROP_TS_OFFSET,
+ PROP_MAX_TS_OFFSET_ADJUSTMENT,
PROP_DO_LOST,
PROP_MODE,
PROP_PERCENT,
PROP_RTX_MIN_RETRY_TIMEOUT,
PROP_RTX_RETRY_PERIOD,
PROP_RTX_MAX_RETRIES,
- PROP_STATS
+ PROP_RTX_DEADLINE,
+ PROP_RTX_STATS_TIMEOUT,
+ PROP_STATS,
+ PROP_MAX_RTCP_RTP_TIME_DIFF,
+ PROP_MAX_DROPOUT_TIME,
+ PROP_MAX_MISORDER_TIME,
+ PROP_RFC7273_SYNC,
+ PROP_FASTSTART_MIN_PACKETS
};
-#define JBUF_LOCK(priv) (g_mutex_lock (&(priv)->jbuf_lock))
+#define JBUF_LOCK(priv) G_STMT_START { \
+ GST_TRACE("Locking from thread %p", g_thread_self()); \
+ (g_mutex_lock (&(priv)->jbuf_lock)); \
+ GST_TRACE("Locked from thread %p", g_thread_self()); \
+ } G_STMT_END
#define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
JBUF_LOCK (priv); \
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
-#define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
+#define JBUF_UNLOCK(priv) G_STMT_START { \
+ GST_TRACE ("Unlocking from thread %p", g_thread_self ()); \
+ (g_mutex_unlock (&(priv)->jbuf_lock)); \
+} G_STMT_END
+
+#define JBUF_WAIT_QUEUE(priv) G_STMT_START { \
+ GST_DEBUG ("waiting queue"); \
+ (priv)->waiting_queue++; \
+ g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \
+ (priv)->waiting_queue--; \
+ GST_DEBUG ("waiting queue done"); \
+} G_STMT_END
+#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_queue)) { \
+ GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
+ g_cond_signal (&(priv)->jbuf_queue); \
+ } \
+} G_STMT_END
#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
GST_DEBUG ("waiting timer"); \
- (priv)->waiting_timer = TRUE; \
+ (priv)->waiting_timer++; \
g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
- (priv)->waiting_timer = FALSE; \
+ (priv)->waiting_timer--; \
GST_DEBUG ("waiting timer done"); \
} G_STMT_END
#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
if (G_UNLIKELY ((priv)->waiting_timer)) { \
- GST_DEBUG ("signal timer"); \
+ GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \
g_cond_signal (&(priv)->jbuf_timer); \
} \
} G_STMT_END
} \
} G_STMT_END
+#define GST_BUFFER_IS_RETRANSMISSION(buffer) \
+ GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
+
+typedef struct TimerQueue
+{
+ GQueue *timers;
+ GHashTable *hashtable;
+} TimerQueue;
struct _GstRtpJitterBufferPrivate
{
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
+ gboolean waiting_queue;
+ GCond jbuf_queue;
gboolean waiting_timer;
GCond jbuf_timer;
gboolean waiting_event;
gboolean ts_discont;
gboolean active;
guint64 out_offset;
+ guint32 segment_seqnum;
gboolean timer_running;
GThread *timer_thread;
guint64 latency_ns;
gboolean drop_on_latency;
gint64 ts_offset;
+ guint64 max_ts_offset_adjustment;
gboolean do_lost;
gboolean do_retransmission;
gboolean rtx_next_seqnum;
gint rtx_min_retry_timeout;
gint rtx_retry_period;
gint rtx_max_retries;
+ guint rtx_stats_timeout;
+ gint rtx_deadline_ms;
+ gint max_rtcp_rtp_time_diff;
+ guint32 max_dropout_time;
+ guint32 max_misorder_time;
+ guint faststart_min_packets;
/* the last seqnum we pushed out */
guint32 last_popped_seqnum;
/* last output time */
GstClockTime last_out_time;
/* last valid input timestamp and rtptime pair */
- GstClockTime ips_dts;
+ GstClockTime ips_pts;
guint64 ips_rtptime;
GstClockTime packet_spacing;
+ gint equidistant;
GQueue gap_packets;
/* the next expected seqnum we receive */
- GstClockTime last_in_dts;
- guint32 last_in_seqnum;
+ GstClockTime last_in_pts;
guint32 next_in_seqnum;
GArray *timers;
+ TimerQueue *rtx_stats_timers;
/* start and stop ranges */
GstClockTime npt_start;
gint last_pt;
gint32 clock_rate;
gint64 clock_base;
- gint64 prev_ts_offset;
+ gint64 ts_offset_remainder;
/* when we are shutting down */
GstFlowReturn srcresult;
GstBuffer *last_sr;
/* some accounting */
+ guint64 num_pushed;
+ guint64 num_lost;
guint64 num_late;
guint64 num_duplicates;
guint64 num_rtx_requests;
guint64 num_rtx_failed;
gdouble avg_rtx_num;
guint64 avg_rtx_rtt;
+ RTPPacketRateCtx packet_rate_ctx;
/* for the jitter */
GstClockTime last_dts;
+ GstClockTime last_pts;
guint64 last_rtptime;
GstClockTime avg_jitter;
};
GstClockTime rtx_retry;
GstClockTime rtx_last;
guint num_rtx_retry;
+ guint num_rtx_received;
} TimerData;
-#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
- GstRtpJitterBufferPrivate))
-
static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
#define gst_rtp_jitter_buffer_parent_class parent_class
-G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
+G_DEFINE_TYPE_WITH_PRIVATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer,
+ GST_TYPE_ELEMENT);
/* object overrides */
static void gst_rtp_jitter_buffer_set_property (GObject * object,
static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
GstPad * pad);
static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
+static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
+ GstClock * clock);
/* pad overrides */
static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
GstObject * parent, GstEvent * event);
static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
GstObject * parent, GstBuffer * buffer);
+static GstFlowReturn gst_rtp_jitter_buffer_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * buffer_list);
static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
GstObject * parent, GstEvent * event);
static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
jitterbuffer);
+static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
+ TimerData * timer, GstClockTime dts, gboolean success);
+
+static TimerQueue *timer_queue_new (void);
+static void timer_queue_free (TimerQueue * queue);
+
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
{
gobject_class = (GObjectClass *) klass;
gstelement_class = (GstElementClass *) klass;
- g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
-
gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
+ * GstRtpJitterBuffer:max-ts-offset-adjustment:
+ *
+ * The maximum number of nanoseconds per frame that time offset may be
+ * adjusted with. This is used to avoid sudden large changes to time stamps.
+ */
+ g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
+ g_param_spec_uint64 ("max-ts-offset-adjustment",
+ "Max Timestamp Offset Adjustment",
+ "The maximum number of nanoseconds per frame that time stamp "
+ "offsets may be adjusted (0 = no limit).", 0, G_MAXUINT64,
+ DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
+ G_PARAM_STATIC_STRINGS));
+
+ /**
* GstRtpJitterBuffer:do-lost:
*
* Send out a GstRTPPacketLost event downstream when a packet is considered
* this much packet reordering.
*
* When -1 is used, the value will be estimated based on observed packet
- * reordering.
+ * reordering. When 0 is used packet reordering alone will not cause a
+ * retransmission event (Since 1.10).
*
* Since: 1.2
*/
g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
- "Sending retransmission event when this much reordering (-1 automatic)",
+ "Sending retransmission event when this much reordering "
+ "(0 disable)",
-1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
"(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
+ * GstRtpJitterBuffer:rtx-deadline:
+ *
+ * The deadline for a valid RTX request in ms.
+ *
+ * How long the RTX RTCP will be valid for.
+ * When -1 is used, the size of the jitterbuffer will be used.
+ *
+ * Since: 1.10
+ */
+ g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE,
+ g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)",
+ "The deadline for a valid RTX request in milliseconds. "
+ "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+/**
+ * GstRtpJitterBuffer::rtx-stats-timeout:
+ *
+ * The time to wait for a retransmitted packet after it has been
+ * considered lost in order to collect RTX statistics.
+ *
+ * Since: 1.10
+ */
+ g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
+ g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
+ "The time to wait for a retransmitted packet after it has been "
+ "considered lost in order to collect statistics (ms)",
+ 0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
+ g_param_spec_uint ("max-dropout-time", "Max dropout time",
+ "The maximum time (milliseconds) of missing packets tolerated.",
+ 0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
+ g_param_spec_uint ("max-misorder-time", "Max misorder time",
+ "The maximum time (milliseconds) of misordered packets tolerated.",
+ 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
* GstRtpJitterBuffer:stats:
*
* Various jitterbuffer statistics. This property returns a GstStructure
* <listitem>
* <para>
* #guint64
+ * <classname>"num-pushed"</classname>:
+ * the number of packets pushed out.
+ * </para>
+ * </listitem>
+ * <listitem>
+ * <para>
+ * #guint64
+ * <classname>"num-lost"</classname>:
+ * the number of packets considered lost.
+ * </para>
+ * </listitem>
+ * <listitem>
+ * <para>
+ * #guint64
+ * <classname>"num-late"</classname>:
+ * the number of packets arriving too late.
+ * </para>
+ * </listitem>
+ * <listitem>
+ * <para>
+ * #guint64
+ * <classname>"num-duplicates"</classname>:
+ * the number of duplicate packets.
+ * </para>
+ * </listitem>
+ * <listitem>
+ * <para>
+ * #guint64
* <classname>"rtx-count"</classname>:
* the number of retransmissions requested.
* </para>
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
+ * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
+ *
+ * The maximum amount of time in ms that the RTP time in the RTCP SRs
+ * is allowed to be ahead of the last RTP packet we received. Use
+ * -1 to disable ignoring of RTCP packets.
+ *
+ * Since: 1.8
+ */
+ g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
+ g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
+ "Maximum amount of time in ms that the RTP time in RTCP SRs "
+ "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
+ DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
+ g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
+ "Synchronize received streams to the RFC7273 clock "
+ "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstRtpJitterBuffer:faststart-min-packets
+ *
+ * The number of consecutive packets needed to start (set to 0 to
+ * disable faststart. The jitterbuffer will by default start after the
+ * latency has elapsed)
+ *
+ * Since: 1.14
+ */
+ g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS,
+ g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets",
+ "The number of consecutive packets needed to start (set to 0 to "
+ "disable faststart. The jitterbuffer will by default start after "
+ "the latency has elapsed)",
+ 0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GstRtpJitterBuffer::request-pt-map:
* @buffer: the object which received the signal
* @pt: the pt
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
gstelement_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
+ gstelement_class->set_clock =
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
+ gst_element_class_add_static_pad_template (gstelement_class,
+ &gst_rtp_jitter_buffer_src_template);
+ gst_element_class_add_static_pad_template (gstelement_class,
+ &gst_rtp_jitter_buffer_sink_template);
+ gst_element_class_add_static_pad_template (gstelement_class,
+ &gst_rtp_jitter_buffer_sink_rtcp_template);
gst_element_class_set_static_metadata (gstelement_class,
"RTP packet jitter-buffer", "Filter/Network/RTP",
GST_DEBUG_CATEGORY_INIT
(rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp);
}
static void
{
GstRtpJitterBufferPrivate *priv;
- priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
+ priv = gst_rtp_jitter_buffer_get_instance_private (jitterbuffer);
jitterbuffer->priv = priv;
priv->latency_ms = DEFAULT_LATENCY_MS;
priv->latency_ns = priv->latency_ms * GST_MSECOND;
priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
+ priv->ts_offset = DEFAULT_TS_OFFSET;
+ priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
priv->do_lost = DEFAULT_DO_LOST;
priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
-
+ priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE;
+ priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
+ priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
+ priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
+ priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
+ priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS;
+
+ priv->ts_offset_remainder = 0;
priv->last_dts = -1;
+ priv->last_pts = -1;
priv->last_rtptime = -1;
priv->avg_jitter = 0;
+ priv->segment_seqnum = GST_SEQNUM_INVALID;
priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
+ priv->rtx_stats_timers = timer_queue_new ();
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
+ g_cond_init (&priv->jbuf_queue);
g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event);
g_cond_init (&priv->jbuf_query);
g_queue_init (&priv->gap_packets);
+ gst_segment_init (&priv->segment, GST_FORMAT_TIME);
/* reset skew detection initialy */
rtp_jitter_buffer_reset_skew (priv->jbuf);
gst_pad_set_chain_function (priv->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
+ gst_pad_set_chain_list_function (priv->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain_list));
gst_pad_set_event_function (priv->sinkpad,
GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
gst_pad_set_query_function (priv->sinkpad,
priv = jitterbuffer->priv;
g_array_free (priv->timers, TRUE);
+ timer_queue_free (priv->rtx_stats_timers);
g_mutex_clear (&priv->jbuf_lock);
+ g_cond_clear (&priv->jbuf_queue);
g_cond_clear (&priv->jbuf_timer);
g_cond_clear (&priv->jbuf_event);
g_cond_clear (&priv->jbuf_query);
return gst_system_clock_obtain ();
}
+static gboolean
+gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
+{
+ GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
+
+ rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
+
+ return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
+}
+
static void
gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
{
}
if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
/* head buffer timestamp and offset gives our output time */
- last_out = item->dts + priv->ts_offset;
+ last_out = item->pts + priv->ts_offset;
} else {
/* use last known time when the buffer is empty */
last_out = priv->last_out_time;
static gboolean
gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
- GstCaps * caps)
+ GstCaps * caps, gint pt)
{
GstRtpJitterBufferPrivate *priv;
GstStructure *caps_struct;
guint val;
+ gint payload = -1;
GstClockTime tval;
+ const gchar *ts_refclk, *mediaclk;
priv = jitterbuffer->priv;
/* first parse the caps */
caps_struct = gst_caps_get_structure (caps, 0);
- GST_DEBUG_OBJECT (jitterbuffer, "got caps");
+ GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
+
+ if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
+ && payload != pt) {
+ GST_ERROR_OBJECT (jitterbuffer,
+ "Got caps with wrong payload type (got %d, expected %d)", pt, payload);
+ return FALSE;
+ }
+
+ if (payload != -1) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
+ priv->last_pt = payload;
+ }
/* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
* measure the amount of data in the buffer */
rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
+ gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
+
/* The clock base is the RTP timestamp corrsponding to the npt-start value. We
* can use this to track the amount of time elapsed on the sender. */
if (gst_structure_get_uint (caps_struct, "clock-base", &val))
"npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
+ if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
+ GstClock *clock = NULL;
+ guint64 clock_offset = -1;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
+ ts_refclk);
+
+ if (g_str_has_prefix (ts_refclk, "ntp=")) {
+ if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
+ GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
+ } else {
+ const gchar *host, *portstr;
+ gchar *hostname;
+ guint port;
+
+ host = ts_refclk + sizeof ("ntp=") - 1;
+ if (host[0] == '[') {
+ /* IPv6 */
+ portstr = strchr (host, ']');
+ if (portstr && portstr[1] == ':')
+ portstr = portstr + 1;
+ else
+ portstr = NULL;
+ } else {
+ portstr = strrchr (host, ':');
+ }
+
+
+ if (!portstr || sscanf (portstr, ":%u", &port) != 1)
+ port = 123;
+
+ if (portstr)
+ hostname = g_strndup (host, (portstr - host));
+ else
+ hostname = g_strdup (host);
+
+ clock = gst_ntp_clock_new (NULL, hostname, port, 0);
+ g_free (hostname);
+ }
+ } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
+ const gchar *domainstr =
+ ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
+ guint domain;
+
+ if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
+ domain = 0;
+
+ clock = gst_ptp_clock_new (NULL, domain);
+ } else {
+ GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
+ }
+
+ if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
+
+ if (!g_str_has_prefix (mediaclk, "direct=")
+ || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
+ GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
+ if (strstr (mediaclk, "rate=") != NULL) {
+ GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
+ clock_offset = -1;
+ }
+ }
+
+ rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
+ } else {
+ rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
+ }
+
return TRUE;
/* ERRORS */
/* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL_EVENT (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
}
priv->srcresult = GST_FLOW_OK;
gst_segment_init (&priv->segment, GST_FORMAT_TIME);
priv->last_popped_seqnum = -1;
- priv->last_out_time = -1;
+ priv->last_out_time = GST_CLOCK_TIME_NONE;
priv->next_seqnum = -1;
priv->seqnum_base = -1;
priv->ips_rtptime = -1;
- priv->ips_dts = GST_CLOCK_TIME_NONE;
+ priv->ips_pts = GST_CLOCK_TIME_NONE;
priv->packet_spacing = 0;
priv->next_in_seqnum = -1;
priv->clock_rate = -1;
priv->avg_jitter = 0;
priv->last_dts = -1;
priv->last_rtptime = -1;
+ priv->last_in_pts = 0;
+ priv->equidistant = 0;
+ priv->segment_seqnum = GST_SEQNUM_INVALID;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
/* block until we go to PLAYING */
priv->blocked = TRUE;
priv->timer_running = TRUE;
+ priv->srcresult = GST_FLOW_OK;
priv->timer_thread =
g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
priv->timer_running = FALSE;
+ priv->srcresult = GST_FLOW_FLUSHING;
unschedule_current_timer (jitterbuffer);
JBUF_SIGNAL_TIMER (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
g_thread_join (priv->timer_thread);
priv->timer_thread = NULL;
GstCaps *caps;
gst_event_parse_caps (event, &caps);
- gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+ gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
break;
}
case GST_EVENT_SEGMENT:
- gst_event_copy_segment (event, &priv->segment);
+ {
+ GstSegment segment;
+ gst_event_copy_segment (event, &segment);
+
+ priv->segment_seqnum = gst_event_get_seqnum (event);
/* we need time for now */
- if (priv->segment.format != GST_FORMAT_TIME)
- goto newseg_wrong_format;
+ if (segment.format != GST_FORMAT_TIME) {
+ GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
+ gst_event_unref (event);
- GST_DEBUG_OBJECT (jitterbuffer,
- "segment: %" GST_SEGMENT_FORMAT, &priv->segment);
+ gst_segment_init (&segment, GST_FORMAT_TIME);
+ event = gst_event_new_segment (&segment);
+ gst_event_set_seqnum (event, priv->segment_seqnum);
+ }
+
+ priv->segment = segment;
break;
+ }
case GST_EVENT_EOS:
priv->eos = TRUE;
rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
GST_DEBUG_OBJECT (jitterbuffer, "adding event");
item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
- if (head)
+ if (head || priv->eos)
JBUF_SIGNAL_EVENT (priv);
return TRUE;
-
- /* ERRORS */
-newseg_wrong_format:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
- gst_event_unref (event);
- return FALSE;
- }
}
static gboolean
if (!caps)
goto no_caps;
- res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+ res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
gst_caps_unref (caps);
if (G_UNLIKELY (!res))
return message;
}
+static void
+update_offset (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv;
+
+ priv = jitterbuffer->priv;
+
+ if (priv->ts_offset_remainder != 0) {
+ GST_DEBUG ("adjustment %" G_GUINT64_FORMAT " remain %" G_GINT64_FORMAT
+ " off %" G_GINT64_FORMAT, priv->max_ts_offset_adjustment,
+ priv->ts_offset_remainder, priv->ts_offset);
+ if (ABS (priv->ts_offset_remainder) > priv->max_ts_offset_adjustment) {
+ if (priv->ts_offset_remainder > 0) {
+ priv->ts_offset += priv->max_ts_offset_adjustment;
+ priv->ts_offset_remainder -= priv->max_ts_offset_adjustment;
+ } else {
+ priv->ts_offset -= priv->max_ts_offset_adjustment;
+ priv->ts_offset_remainder += priv->max_ts_offset_adjustment;
+ }
+ } else {
+ priv->ts_offset += priv->ts_offset_remainder;
+ priv->ts_offset_remainder = 0;
+ }
+ }
+}
+
static GstClockTime
apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
{
return timestamp;
}
+static TimerQueue *
+timer_queue_new (void)
+{
+ TimerQueue *queue;
+
+ queue = g_slice_new (TimerQueue);
+ queue->timers = g_queue_new ();
+ queue->hashtable = g_hash_table_new (NULL, NULL);
+
+ return queue;
+}
+
+static void
+timer_queue_free (TimerQueue * queue)
+{
+ if (!queue)
+ return;
+
+ g_hash_table_destroy (queue->hashtable);
+ g_queue_free_full (queue->timers, g_free);
+ g_slice_free (TimerQueue, queue);
+}
+
+static void
+timer_queue_append (TimerQueue * queue, const TimerData * timer,
+ GstClockTime timeout, gboolean lost)
+{
+ TimerData *copy;
+
+ copy = g_memdup (timer, sizeof (*timer));
+ copy->timeout = timeout;
+ copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED;
+ copy->idx = -1;
+
+ GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
+ copy->seqnum, GST_TIME_ARGS (copy->timeout));
+ g_queue_push_tail (queue->timers, copy);
+ g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
+}
+
+static void
+timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
+{
+ TimerData *test;
+
+ test = g_queue_peek_head (queue->timers);
+ while (test && test->timeout < timeout) {
+ GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
+ GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
+ GST_TIME_ARGS (timeout));
+ g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
+ g_free (g_queue_pop_head (queue->timers));
+ test = g_queue_peek_head (queue->timers);
+ }
+}
+
+static TimerData *
+timer_queue_find (TimerQueue * queue, guint16 seqnum)
+{
+ return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
+}
+
static TimerData *
-find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
+find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
TimerData *timer = NULL;
len = priv->timers->len;
for (i = 0; i < len; i++) {
TimerData *test = &g_array_index (priv->timers, TimerData, i);
- if (test->seqnum == seqnum && test->type == type) {
+ if (test->seqnum == seqnum) {
timer = test;
break;
}
timer->rtx_delay = delay;
timer->rtx_retry = 0;
}
+ timer->rtx_last = GST_CLOCK_TIME_NONE;
timer->num_rtx_retry = 0;
+ timer->num_rtx_received = 0;
recalculate_timer (jitterbuffer, timer);
JBUF_SIGNAL_TIMER (priv);
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
gboolean seqchange, timechange;
guint16 oldseq;
+ GstClockTime new_timeout;
- seqchange = timer->seqnum != seqnum;
- timechange = timer->timeout != timeout;
+ oldseq = timer->seqnum;
+ new_timeout = timeout + delay;
+ seqchange = oldseq != seqnum;
+ timechange = timer->timeout != new_timeout;
- if (!seqchange && !timechange)
+ if (!seqchange && !timechange) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
+ "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
return;
-
- oldseq = timer->seqnum;
+ }
GST_DEBUG_OBJECT (jitterbuffer,
- "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
- oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
+ "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
+ "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
+ GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
- timer->timeout = timeout + delay;
+ timer->timeout = new_timeout;
timer->seqnum = seqnum;
if (reset) {
+ GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
+ "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
+ GST_TIME_ARGS (delay));
timer->rtx_base = timeout;
timer->rtx_delay = delay;
timer->rtx_retry = 0;
}
- if (seqchange)
+ if (seqchange) {
timer->num_rtx_retry = 0;
+ timer->num_rtx_received = 0;
+ }
if (priv->clock_id) {
/* we changed the seqnum and there is a timer currently waiting with this
TimerData *timer;
/* find the seqnum timer */
- timer = find_timer (jitterbuffer, type, seqnum);
+ timer = find_timer (jitterbuffer, seqnum);
if (timer == NULL) {
timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
} else {
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
guint idx;
+ if (timer->idx == -1)
+ return;
+
if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
unschedule_current_timer (jitterbuffer);
GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
g_array_remove_index_fast (priv->timers, idx);
timer->idx = idx;
+
+ JBUF_SIGNAL_TIMER (priv);
}
static void
GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
g_array_set_size (priv->timers, 0);
unschedule_current_timer (jitterbuffer);
+ JBUF_SIGNAL_TIMER (priv);
}
/* get the extra delay to wait before sending RTX */
return delay;
}
+/* Check if packet with seqnum is already considered definitely lost by being
+ * part of a "lost timer" for multiple packets */
+static gboolean
+already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ gint i, len;
+
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
+
+ if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 &&
+ gap < test->num) {
+ GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)",
+ seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
/* we just received a packet with seqnum and dts.
*
* First check for old seqnum that we are still expecting. If the gap with the
*/
static void
update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
- GstClockTime dts, gboolean do_next_seqnum)
+ GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
+ gboolean is_rtx, TimerData * timer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- TimerData *timer = NULL;
- gint i, len;
- /* go through all timers and unschedule the ones with a large gap, also find
- * the timer for the seqnum */
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
- gint gap;
+ /* go through all timers and unschedule the ones with a large gap */
+ if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
+ gint i, len;
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ gint gap;
- gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
+ gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
- GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
- test->type, test->seqnum, seqnum, gap);
+ GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
+ test->type, test->seqnum, seqnum, gap);
- if (gap == 0) {
- GST_DEBUG ("found timer for current seqnum");
- /* the timer for the current seqnum */
- timer = test;
- /* when no retransmission, we can stop now, we only need to find the
- * timer for the current seqnum */
- if (!priv->do_retransmission)
- break;
- } else if (gap > priv->rtx_delay_reorder) {
- /* max gap, we exceeded the max reorder distance and we don't expect the
- * missing packet to be this reordered */
- if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
- reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
+ if (gap > priv->rtx_delay_reorder) {
+ /* max gap, we exceeded the max reorder distance and we don't expect the
+ * missing packet to be this reordered */
+ if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
+ reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
+ }
}
}
if (timer && timer->type != TIMER_TYPE_DEADLINE) {
if (timer->num_rtx_retry > 0) {
- GstClockTime rtx_last, delay;
-
- /* we scheduled a retry for this packet and now we have it */
- priv->num_rtx_success++;
- /* all the previous retry attempts failed */
- priv->num_rtx_failed += timer->num_rtx_retry - 1;
- /* number of retries before receiving the packet */
- if (priv->avg_rtx_num == 0.0)
- priv->avg_rtx_num = timer->num_rtx_retry;
- else
- priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
- /* calculate the delay between retransmission request and receiving this
- * packet, start with when we scheduled this timeout last */
- rtx_last = timer->rtx_last;
- if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
- /* we have a valid delay if this packet arrived after we scheduled the
- * request */
- delay = dts - rtx_last;
- if (priv->avg_rtx_rtt == 0)
- priv->avg_rtx_rtt = delay;
- else
- priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
- } else
- delay = 0;
-
- GST_LOG_OBJECT (jitterbuffer,
- "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
- ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
- ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
- priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
- priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
- GST_TIME_ARGS (priv->avg_rtx_rtt));
-
- /* don't try to estimate the next seqnum because this is a retransmitted
- * packet and it probably did not arrive with the expected packet
- * spacing. */
- do_next_seqnum = FALSE;
+ if (is_rtx) {
+ update_rtx_stats (jitterbuffer, timer, dts, TRUE);
+ /* don't try to estimate the next seqnum because this is a retransmitted
+ * packet and it probably did not arrive with the expected packet
+ * spacing. */
+ do_next_seqnum = FALSE;
+ }
+
+ if (!is_rtx || timer->num_rtx_retry > 1) {
+ /* Store timer in order to record stats when/if the retransmitted
+ * packet arrives. We should also store timer information if we've
+ * requested retransmission more than once since we may receive
+ * several retransmitted packets. For accuracy we should update the
+ * stats also when the redundant retransmitted packets arrives. */
+ timer_queue_append (priv->rtx_stats_timers, timer,
+ pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE);
+ }
}
}
- if (do_next_seqnum) {
+ if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) {
GstClockTime expected, delay;
/* calculate expected arrival time of the next seqnum */
- expected = dts + priv->packet_spacing;
+ expected = pts + priv->packet_spacing;
delay = get_rtx_delay (priv);
/* and update/install timer for next seqnum */
- if (timer)
+ GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, expected %"
+ GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", packet-spacing %"
+ GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
+ GST_TIME_ARGS (expected), GST_TIME_ARGS (delay),
+ GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
+
+ if (timer) {
+ timer->type = TIMER_TYPE_EXPECTED;
reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
delay, TRUE);
- else
+ } else {
add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
expected, delay, priv->packet_spacing);
+ }
} else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
/* if we had a timer, remove it, we don't know when to expect the next
* packet. */
static void
calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
- GstClockTime dts)
+ GstClockTime pts)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
/* we need consecutive seqnums with a different
* rtptime to estimate the packet spacing. */
if (priv->ips_rtptime != rtptime) {
- /* rtptime changed, check dts diff */
- if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
- GstClockTime new_packet_spacing = dts - priv->ips_dts;
+ /* rtptime changed, check pts diff */
+ if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) {
+ GstClockTime new_packet_spacing = pts - priv->ips_pts;
GstClockTime old_packet_spacing = priv->packet_spacing;
/* Biased towards bigger packet spacings to prevent
GST_TIME_ARGS (priv->packet_spacing));
}
priv->ips_rtptime = rtptime;
- priv->ips_dts = dts;
+ priv->ips_pts = pts;
}
}
static void
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
- guint16 seqnum, GstClockTime dts, gint gap)
+ guint16 seqnum, GstClockTime pts, gint gap)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime total_duration, duration, expected_dts;
+ GstClockTime duration, expected_pts, delay;
TimerType type;
- guint lost_packets = 0;
+ gboolean equidistant = priv->equidistant > 0;
GST_DEBUG_OBJECT (jitterbuffer,
- "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
- GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
+ "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
- /* the total duration spanned by the missing packets */
- if (dts >= priv->last_in_dts)
- total_duration = dts - priv->last_in_dts;
- else
- total_duration = 0;
-
- /* interpolate between the current time and the last time based on
- * number of packets we are missing, this is the estimated duration
- * for the missing packet based on equidistant packet spacing. */
- duration = total_duration / (gap + 1);
+ if (pts == GST_CLOCK_TIME_NONE) {
+ GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
+ return;
+ }
- GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
- GST_TIME_ARGS (duration));
+ if (equidistant) {
+ GstClockTime total_duration;
+ /* the total duration spanned by the missing packets */
+ if (pts >= priv->last_in_pts)
+ total_duration = pts - priv->last_in_pts;
+ else
+ total_duration = 0;
- if (total_duration > priv->latency_ns) {
- GstClockTime gap_time;
+ /* interpolate between the current time and the last time based on
+ * number of packets we are missing, this is the estimated duration
+ * for the missing packet based on equidistant packet spacing. */
+ duration = total_duration / (gap + 1);
- gap_time = total_duration - priv->latency_ns;
+ GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (duration));
- if (duration > 0) {
- lost_packets = gap_time / duration;
- gap_time = lost_packets * duration;
- } else {
- lost_packets = gap;
- }
+ if (total_duration > priv->latency_ns) {
+ GstClockTime gap_time;
+ guint lost_packets;
- /* too many lost packets, some of the missing packets are already
- * too late and we can generate lost packet events for them. */
- GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
- " > %" GST_TIME_FORMAT ", consider %u lost",
- GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
- lost_packets);
+ if (duration > 0) {
+ GstClockTime gap_dur = gap * duration;
+ if (gap_dur > priv->latency_ns)
+ gap_time = gap_dur - priv->latency_ns;
+ else
+ gap_time = 0;
+ lost_packets = gap_time / duration;
+ } else {
+ gap_time = total_duration - priv->latency_ns;
+ lost_packets = gap;
+ }
- /* this timer will fire immediately and the lost event will be pushed from
- * the timer thread */
- add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
- priv->last_in_dts + duration, 0, gap_time);
+ /* too many lost packets, some of the missing packets are already
+ * too late and we can generate lost packet events for them. */
+ GST_INFO_OBJECT (jitterbuffer,
+ "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")",
+ gap, expected, seqnum - 1, GST_TIME_ARGS (total_duration),
+ GST_TIME_ARGS (priv->latency_ns), lost_packets,
+ GST_TIME_ARGS (gap_time));
+
+ /* this timer will fire immediately and the lost event will be pushed from
+ * the timer thread */
+ if (lost_packets > 0) {
+ add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
+ priv->last_in_pts + duration, 0, gap_time);
+ expected += lost_packets;
+ priv->last_in_pts += gap_time;
+ }
+ }
- expected += lost_packets;
- priv->last_in_dts += gap_time;
+ expected_pts = priv->last_in_pts + duration;
+ } else {
+ /* If we cannot assume equidistant packet spacing, the only thing we now
+ * for sure is that the missing packets have expected pts not later than
+ * the last received pts. */
+ duration = 0;
+ expected_pts = pts;
}
- expected_dts = priv->last_in_dts + (lost_packets + 1) * duration;
+ delay = 0;
if (priv->do_retransmission) {
- TimerData *timer;
+ TimerData *timer = find_timer (jitterbuffer, expected);
type = TIMER_TYPE_EXPECTED;
+ delay = get_rtx_delay (priv);
+
/* if we had a timer for the first missing packet, update it. */
- if ((timer = find_timer (jitterbuffer, type, expected))) {
+ if (timer && timer->type == TIMER_TYPE_EXPECTED) {
GstClockTime timeout = timer->timeout;
timer->duration = duration;
- if (timeout > (expected_dts + timer->rtx_retry)) {
- GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
- reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
+ if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
+ reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
delay, TRUE);
}
expected++;
- expected_dts += duration;
+ expected_pts += duration;
}
} else {
type = TIMER_TYPE_LOST;
}
while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
- add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
- expected_dts += duration;
+ add_timer (jitterbuffer, type, expected, 0, expected_pts, delay, duration);
+ expected_pts += duration;
expected++;
}
}
static void
calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
- guint rtptime)
+ guint32 rtptime)
{
gint32 rtpdiff;
GstClockTimeDiff dtsdiff, rtpdiffns, diff;
else
rtpdiff = 0;
+ /* Guess whether stream currently uses equidistant packet spacing. If we
+ * often see identical timestamps it means the packets are not
+ * equidistant. */
+ if (rtptime == priv->last_rtptime)
+ priv->equidistant -= 2;
+ else
+ priv->equidistant += 1;
+ priv->equidistant = CLAMP (priv->equidistant, -7, 7);
+
priv->last_dts = dts;
priv->last_rtptime = rtptime;
}
static gboolean
-handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future,
- GstBuffer * buffer, guint8 pt, guint16 seqnum, gint gap)
+handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer,
+ guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder)
{
GstRtpJitterBufferPrivate *priv;
guint gap_packets_length;
gboolean reset = FALSE;
+ gboolean future = gap > 0;
priv = jitterbuffer->priv;
GST_DEBUG_OBJECT (jitterbuffer,
"buffer too %s %d < %d, got 5 consecutive ones - reset",
(future ? "new" : "old"), gap,
- (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER));
+ (future ? max_dropout : -max_misorder));
reset = TRUE;
} else if (!all_consecutive) {
g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
GST_DEBUG_OBJECT (jitterbuffer,
"buffer too %s %d < %d, got no 5 consecutive ones - dropping",
(future ? "new" : "old"), gap,
- (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER));
+ (future ? max_dropout : -max_misorder));
buffer = NULL;
} else {
GST_DEBUG_OBJECT (jitterbuffer,
"buffer too %s %d < %d, got %u consecutive ones - waiting",
(future ? "new" : "old"), gap,
- (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER),
- gap_packets_length + 1);
+ (future ? max_dropout : -max_misorder), gap_packets_length + 1);
buffer = NULL;
}
} else {
GST_DEBUG_OBJECT (jitterbuffer,
"buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
- gap, -RTP_MAX_MISORDER);
+ gap, -max_misorder);
g_queue_push_tail (&priv->gap_packets, buffer);
buffer = NULL;
}
return reset;
}
+static GstClockTime
+get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
+ GstClockTime running_time = GST_CLOCK_TIME_NONE;
+
+ if (clock) {
+ GstClockTime base_time =
+ gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
+ GstClockTime clock_time = gst_clock_get_time (clock);
+
+ if (clock_time > base_time)
+ running_time = clock_time - base_time;
+ else
+ running_time = 0;
+
+ gst_object_unref (clock);
+ }
+
+ return running_time;
+}
+
+static GstFlowReturn
+gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
+ GstPad * pad, GstObject * parent, guint16 seqnum)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GList *events = NULL, *l;
+ GList *buffers;
+ gboolean head;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
+ rtp_jitter_buffer_flush (priv->jbuf,
+ (GFunc) free_item_and_retain_events, &events);
+ rtp_jitter_buffer_reset_skew (priv->jbuf);
+ remove_all_timers (jitterbuffer);
+ priv->discont = TRUE;
+ priv->last_popped_seqnum = -1;
+
+ if (priv->gap_packets.head) {
+ GstBuffer *gap_buffer = priv->gap_packets.head->data;
+ GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
+
+ gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
+ priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
+ gst_rtp_buffer_unmap (&gap_rtp);
+ } else {
+ priv->next_seqnum = seqnum;
+ }
+
+ priv->last_in_pts = -1;
+ priv->next_in_seqnum = -1;
+
+ /* Insert all sticky events again in order, otherwise we would
+ * potentially loose STREAM_START, CAPS or SEGMENT events
+ */
+ events = g_list_reverse (events);
+ for (l = events; l; l = l->next) {
+ RTPJitterBufferItem *item;
+
+ item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
+ rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ }
+ g_list_free (events);
+
+ JBUF_SIGNAL_EVENT (priv);
+
+ /* reset spacing estimation when gap */
+ priv->ips_rtptime = -1;
+ priv->ips_pts = GST_CLOCK_TIME_NONE;
+
+ buffers = g_list_copy (priv->gap_packets.head);
+ g_queue_clear (&priv->gap_packets);
+
+ priv->ips_rtptime = -1;
+ priv->ips_pts = GST_CLOCK_TIME_NONE;
+ JBUF_UNLOCK (jitterbuffer->priv);
+
+ for (l = buffers; l; l = l->next) {
+ ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
+ l->data = NULL;
+ if (ret != GST_FLOW_OK) {
+ l = l->next;
+ break;
+ }
+ }
+ for (; l; l = l->next)
+ gst_buffer_unref (l->data);
+ g_list_free (buffers);
+
+ return ret;
+}
+
+static gboolean
+gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv;
+ RTPJitterBufferItem *item;
+ TimerData *timer;
+
+ priv = jitterbuffer->priv;
+
+ if (priv->faststart_min_packets == 0)
+ return FALSE;
+
+ item = rtp_jitter_buffer_peek (priv->jbuf);
+ if (!item)
+ return FALSE;
+
+ timer = find_timer (jitterbuffer, item->seqnum);
+ if (!timer || timer->type != TIMER_TYPE_DEADLINE)
+ return FALSE;
+
+ if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
+ priv->faststart_min_packets)) {
+ GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
+ priv->faststart_min_packets);
+ timer->timeout = -1;
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
static GstFlowReturn
gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
gboolean do_next_seqnum = FALSE;
RTPJitterBufferItem *item;
GstMessage *msg = NULL;
+ gboolean estimated_dts = FALSE;
+ gint32 packet_rate, max_dropout, max_misorder;
+ TimerData *timer = NULL;
jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
else if (pts == -1)
pts = dts;
- /* take the DTS of the buffer. This is the time when the packet was
- * received and is used to calculate jitter and clock skew. We will adjust
- * this DTS with the smoothed value after processing it in the
- * jitterbuffer and assign it as the PTS. */
- /* bring to running time */
- dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
+ if (dts == -1) {
+ /* If we have no DTS here, i.e. no capture time, get one from the
+ * clock now to have something to calculate with in the future. */
+ dts = get_current_running_time (jitterbuffer);
+ pts = dts;
+
+ /* Remember that we estimated the DTS if we are running already
+ * and this is not our first packet (or first packet after a reset).
+ * If it's the first packet, we somehow must generate a timestamp for
+ * everything, otherwise we can't calculate any times
+ */
+ estimated_dts = (priv->next_in_seqnum != -1);
+ } else {
+ /* take the DTS of the buffer. This is the time when the packet was
+ * received and is used to calculate jitter and clock skew. We will adjust
+ * this DTS with the smoothed value after processing it in the
+ * jitterbuffer and assign it as the PTS. */
+ /* bring to running time */
+ dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
+ }
GST_DEBUG_OBJECT (jitterbuffer,
- "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
- GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
+ "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
+ seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
+ GST_BUFFER_IS_RETRANSMISSION (buffer));
JBUF_LOCK_CHECK (priv, out_flushing);
/* Try to get the clock-rate from the caps first if we can. If there are no
* caps we must fire the signal to get the clock-rate. */
if ((caps = gst_pad_get_current_caps (pad))) {
- gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+ gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
gst_caps_unref (caps);
}
}
if (G_UNLIKELY (priv->clock_rate == -1))
goto no_clock_rate;
+
+ gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
}
/* don't accept more data on EOS */
if (G_UNLIKELY (priv->eos))
goto have_eos;
- calculate_jitter (jitterbuffer, dts, rtptime);
+ if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
+ calculate_jitter (jitterbuffer, dts, rtptime);
if (priv->seqnum_base != -1) {
gint gap;
"packet seqnum #%d before seqnum-base #%d", seqnum,
priv->seqnum_base);
gst_buffer_unref (buffer);
- ret = GST_FLOW_OK;
goto finished;
} else if (gap > 16384) {
/* From now on don't compare against the seqnum base anymore as
expected = priv->next_in_seqnum;
+ packet_rate =
+ gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
+ max_dropout =
+ gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
+ priv->max_dropout_time);
+ max_misorder =
+ gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
+ priv->max_misorder_time);
+ GST_TRACE_OBJECT (jitterbuffer,
+ "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
+ max_dropout, max_misorder);
+
/* now check against our expected seqnum */
- if (G_LIKELY (expected != -1)) {
- gint gap;
+ if (G_UNLIKELY (expected == -1)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
+
+ /* calculate a pts based on rtptime and arrival time (dts) */
+ pts =
+ rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
+ rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
+
+ /* we don't know what the next_in_seqnum should be, wait for the last
+ * possible moment to push this buffer, maybe we get an earlier seqnum
+ * while we wait */
+ set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
+
+ do_next_seqnum = TRUE;
+ /* take rtptime and pts to calculate packet spacing */
+ priv->ips_rtptime = rtptime;
+ priv->ips_pts = pts;
+ } else {
+ gint gap;
/* now calculate gap */
gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
-
GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
expected, seqnum, gap);
- if (G_LIKELY (gap == 0)) {
- /* packet is expected */
- calculate_packet_spacing (jitterbuffer, rtptime, dts);
- do_next_seqnum = TRUE;
- } else {
- gboolean reset = FALSE;
-
- if (!GST_CLOCK_TIME_IS_VALID (dts)) {
- /* We would run into calculations with GST_CLOCK_TIME_NONE below
- * and can't compensate for anything without DTS on RTP packets
- */
- goto gap_but_no_dts;
- } else if (gap < 0) {
- /* we received an old packet */
- if (G_UNLIKELY (gap != -1 && gap < -RTP_MAX_MISORDER)) {
- reset =
- handle_big_gap_buffer (jitterbuffer, FALSE, buffer, pt, seqnum,
- gap);
- buffer = NULL;
- } else {
- GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
- }
- } else {
- /* new packet, we are missing some packets */
- if (G_UNLIKELY (gap >= RTP_MAX_DROPOUT)) {
- reset =
- handle_big_gap_buffer (jitterbuffer, TRUE, buffer, pt, seqnum,
- gap);
- buffer = NULL;
- } else {
- GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
- /* fill in the gap with EXPECTED timers */
- calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
+ if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
+ /* If we have timers for more than RTP_MAX_DROPOUT packets
+ * pending this means that we have a huge gap overall. We can
+ * reset the jitterbuffer at this point because there's
+ * just too much data missing to be able to do anything
+ * sensible with the past data. Just try again from the
+ * next packet */
+ GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
+ priv->timers->len, max_dropout);
+ gst_buffer_unref (buffer);
+ return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
+ }
- do_next_seqnum = TRUE;
- }
+ /* Special handling of large gaps */
+ if ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout)) {
+ gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
+ gap, max_dropout, max_misorder);
+ if (reset) {
+ return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Had big gap, waiting for more consecutive packets");
+ goto finished;
}
- if (G_UNLIKELY (reset)) {
- GList *events = NULL, *l;
- GList *buffers;
-
- GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
- rtp_jitter_buffer_flush (priv->jbuf,
- (GFunc) free_item_and_retain_events, &events);
- rtp_jitter_buffer_reset_skew (priv->jbuf);
- remove_all_timers (jitterbuffer);
- priv->discont = TRUE;
- priv->last_popped_seqnum = -1;
- priv->next_seqnum = seqnum;
-
- priv->last_in_seqnum = -1;
- priv->last_in_dts = -1;
- priv->next_in_seqnum = -1;
-
- /* Insert all sticky events again in order, otherwise we would
- * potentially loose STREAM_START, CAPS or SEGMENT events
- */
- events = g_list_reverse (events);
- for (l = events; l; l = l->next) {
- RTPJitterBufferItem *item;
-
- item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
- }
- g_list_free (events);
-
- JBUF_SIGNAL_EVENT (priv);
-
- /* reset spacing estimation when gap */
- priv->ips_rtptime = -1;
- priv->ips_dts = GST_CLOCK_TIME_NONE;
+ }
- buffers = g_list_copy (priv->gap_packets.head);
- g_queue_clear (&priv->gap_packets);
+ /* We had no huge gap, let's drop all the gap packets */
+ GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
+ g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (&priv->gap_packets);
- priv->ips_rtptime = -1;
- priv->ips_dts = GST_CLOCK_TIME_NONE;
- JBUF_UNLOCK (jitterbuffer->priv);
+ /* calculate a pts based on rtptime and arrival time (dts) */
+ /* If we estimated the DTS, don't consider it in the clock skew calculations */
+ pts =
+ rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
+ rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
- for (l = buffers; l; l = l->next) {
- ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
- l->data = NULL;
- if (ret != GST_FLOW_OK)
- break;
- }
- for (; l; l = l->next)
- gst_buffer_unref (l->data);
- g_list_free (buffers);
+ if (G_LIKELY (gap == 0)) {
+ /* packet is expected */
+ calculate_packet_spacing (jitterbuffer, rtptime, pts);
+ do_next_seqnum = TRUE;
+ } else {
- return ret;
+ /* we have a gap */
+ if (gap > 0) {
+ GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
+ /* fill in the gap with EXPECTED timers */
+ calculate_expected (jitterbuffer, expected, seqnum, pts, gap);
+ do_next_seqnum = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
+ do_next_seqnum = FALSE;
}
+
/* reset spacing estimation when gap */
priv->ips_rtptime = -1;
- priv->ips_dts = GST_CLOCK_TIME_NONE;
+ priv->ips_pts = GST_CLOCK_TIME_NONE;
}
- } else {
- GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
- /* we don't know what the next_in_seqnum should be, wait for the last
- * possible moment to push this buffer, maybe we get an earlier seqnum
- * while we wait */
- set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
- do_next_seqnum = TRUE;
- /* take rtptime and dts to calculate packet spacing */
- priv->ips_rtptime = rtptime;
- priv->ips_dts = dts;
- }
-
- /* We had no huge gap, let's drop all the gap packets */
- if (buffer != NULL) {
- GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
- g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
- g_queue_clear (&priv->gap_packets);
- } else {
- GST_DEBUG_OBJECT (jitterbuffer,
- "Had big gap, waiting for more consecutive packets");
- JBUF_UNLOCK (jitterbuffer->priv);
- return GST_FLOW_OK;
}
if (do_next_seqnum) {
- priv->last_in_seqnum = seqnum;
- priv->last_in_dts = dts;
+ priv->last_in_pts = pts;
priv->next_in_seqnum = (seqnum + 1) & 0xffff;
}
+ timer = find_timer (jitterbuffer, seqnum);
+ if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
+ if (!timer)
+ timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
+ if (timer)
+ timer->num_rtx_received++;
+ }
+
+ /* At 2^15, we would detect a seqnum rollover too early, therefore
+ * limit the queue size. But let's not limit it to a number that is
+ * too small to avoid emptying it needlessly if there is a spurious huge
+ * sequence number, let's allow at least 10k packets in any case. */
+ while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 &&
+ rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 &&
+ priv->srcresult == GST_FLOW_OK)
+ JBUF_WAIT_QUEUE (priv);
+ if (priv->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
/* let's check if this buffer is too late, we can only accept packets with
* bigger seqnum than the one we last pushed. */
if (G_LIKELY (priv->last_popped_seqnum != -1)) {
gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
/* priv->last_popped_seqnum >= seqnum, we're too late. */
- if (G_UNLIKELY (gap <= 0))
+ if (G_UNLIKELY (gap <= 0)) {
+ if (priv->do_retransmission) {
+ if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
+ update_rtx_stats (jitterbuffer, timer, dts, FALSE);
+ /* Only count the retranmitted packet too late if it has been
+ * considered lost. If the original packet arrived before the
+ * retransmitted we just count it as a duplicate. */
+ if (timer->type != TIMER_TYPE_LOST)
+ goto rtx_duplicate;
+ }
+ }
goto too_late;
+ }
}
+ if (already_lost (jitterbuffer, seqnum))
+ goto already_lost;
+
/* let's drop oldest packet if the queue is already full and drop-on-latency
* is set. We can only do this when there actually is a latency. When no
* latency is set, we just pump it in the queue and let the other end push it
old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
old_item);
- priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
+ priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
free_item (old_item);
}
/* we might have removed some head buffers, signal the pushing thread to
}
}
- item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
+ /* If we estimated the DTS, don't consider it in the clock skew calculations
+ * later. The code above always sets dts to pts or the other way around if
+ * any of those is valid in the buffer, so we know that if we estimated the
+ * dts that both are unknown */
+ if (estimated_dts)
+ item =
+ alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
+ pts, seqnum, 1, rtptime);
+ else
+ item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
- if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
- &head, &percent)))
+ if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head,
+ &percent))) {
+ if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
+ update_rtx_stats (jitterbuffer, timer, dts, FALSE);
goto duplicate;
+ }
+
+ /* Trigger fast start if needed */
+ if (gst_rtp_jitter_buffer_fast_start (jitterbuffer))
+ head = TRUE;
/* update timers */
- update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
+ update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum,
+ GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
/* we had an unhandled SR, handle it now */
if (priv->last_sr)
}
too_late:
{
- GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
" popped, dropping", seqnum, priv->last_popped_seqnum);
priv->num_late++;
gst_buffer_unref (buffer);
goto finished;
}
+already_lost:
+ {
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
+ "considered lost", seqnum);
+ priv->num_late++;
+ gst_buffer_unref (buffer);
+ goto finished;
+ }
duplicate:
{
- GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
+ GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
seqnum);
priv->num_duplicates++;
free_item (item);
goto finished;
}
-gap_but_no_dts:
+rtx_duplicate:
{
- /* this is fatal as we can't compensate for gaps without DTS */
- GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
- ("Received packet without DTS after a gap"));
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Duplicate RTX packet #%d detected, dropping", seqnum);
+ priv->num_duplicates++;
gst_buffer_unref (buffer);
- ret = GST_FLOW_ERROR;
goto finished;
}
}
+/* FIXME: hopefully we can do something more efficient here, especially when
+ * all packets are in order and/or outside of the currently cached range.
+ * Still worthwhile to have it, avoids taking/releasing object lock and pad
+ * stream lock for every single buffer in the default chain_list fallback. */
+static GstFlowReturn
+gst_rtp_jitter_buffer_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * buffer_list)
+{
+ GstFlowReturn flow_ret = GST_FLOW_OK;
+ guint i, n;
+
+ n = gst_buffer_list_length (buffer_list);
+ for (i = 0; i < n; ++i) {
+ GstBuffer *buf = gst_buffer_list_get (buffer_list, i);
+
+ flow_ret = gst_rtp_jitter_buffer_chain (pad, parent, gst_buffer_ref (buf));
+
+ if (flow_ret != GST_FLOW_OK)
+ break;
+ }
+ gst_buffer_list_unref (buffer_list);
+
+ return flow_ret;
+}
+
static GstClockTime
compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
{
}
/* this is the current time as running-time */
- out_time = item->dts;
+ out_time = item->pts;
if (elapsed > 0)
estimated = gst_util_uint64_scale (out_time, total, elapsed);
}
dts =
- gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
+ gst_segment_position_from_running_time (&priv->segment,
+ GST_FORMAT_TIME, item->dts);
pts =
- gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
+ gst_segment_position_from_running_time (&priv->segment,
+ GST_FORMAT_TIME, item->pts);
+
+ /* if this is a new frame, check if ts_offset needs to be updated */
+ if (pts != priv->last_pts) {
+ update_offset (jitterbuffer);
+ }
/* apply timestamp with offset to buffer now */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
/* update the elapsed time when we need to check against the npt stop time. */
update_estimated_eos (jitterbuffer, item);
+ priv->last_pts = pts;
priv->last_out_time = GST_BUFFER_PTS (outbuf);
break;
case ITEM_TYPE_LOST:
priv->next_seqnum = (seqnum + item->count) & 0xffff;
}
msg = check_buffering_percent (jitterbuffer, percent);
+
+ if (type == ITEM_TYPE_EVENT && outevent &&
+ GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
+ g_assert (priv->eos);
+ while (priv->timers->len > 0) {
+ /* Stopping timers */
+ unschedule_current_timer (jitterbuffer);
+ JBUF_WAIT_TIMER (priv);
+ }
+ }
+
JBUF_UNLOCK (priv);
item->data = NULL;
"Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
+ priv->num_pushed++;
result = gst_pad_push (priv->srcpad, outbuf);
JBUF_LOCK_CHECK (priv, out_flushing);
case ITEM_TYPE_EVENT:
/* We got not enough consecutive packets with a huge gap, we can
* as well just drop them here now on EOS */
- if (GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
+ if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&priv->gap_packets);
if (do_push)
gst_pad_push_event (priv->srcpad, outevent);
- else
+ else if (outevent)
gst_event_unref (outevent);
result = GST_FLOW_OK;
handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstFlowReturn result = GST_FLOW_OK;
+ GstFlowReturn result;
RTPJitterBufferItem *item;
guint seqnum;
guint32 next_seqnum;
- gint gap;
/* only push buffers when PLAYING and active and not buffering */
if (priv->blocked || !priv->active ||
- rtp_jitter_buffer_is_buffering (priv->jbuf))
+ rtp_jitter_buffer_is_buffering (priv->jbuf)) {
return GST_FLOW_WAIT;
+ }
-again:
/* peek a buffer, we're just looking at the sequence number.
* If all is fine, we'll pop and push it. If the sequence number is wrong we
* wait for a timeout or something to change.
* The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
item = rtp_jitter_buffer_peek (priv->jbuf);
- if (item == NULL)
+ if (item == NULL) {
goto wait;
+ }
/* get the seqnum and the next expected seqnum */
seqnum = item->seqnum;
- if (seqnum == -1)
- goto do_push;
+ if (seqnum == -1) {
+ return pop_and_push_next (jitterbuffer, seqnum);
+ }
next_seqnum = priv->next_seqnum;
* fires, so wait for that */
result = GST_FLOW_WAIT;
} else {
- /* else calculate GAP */
- gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
+ gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
if (G_LIKELY (gap == 0)) {
- do_push:
/* no missing packet, pop and push */
result = pop_and_push_next (jitterbuffer, seqnum);
} else if (G_UNLIKELY (gap < 0)) {
- RTPJitterBufferItem *item;
/* if we have a packet that we already pushed or considered dropped, pop it
* off and get the next packet */
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
free_item (item);
- goto again;
+ result = GST_FLOW_OK;
} else {
/* the chain function has scheduled timers to request retransmission or
* when to consider the packet lost, wait for that */
GST_DEBUG_OBJECT (jitterbuffer,
"Sequence number GAP detected: expected %d instead of %d (%d missing)",
next_seqnum, seqnum, gap);
- result = GST_FLOW_WAIT;
+ /* if we have reached EOS, just keep processing */
+ if (priv->eos) {
+ result = pop_and_push_next (jitterbuffer, seqnum);
+ result = GST_FLOW_OK;
+ } else {
+ result = GST_FLOW_WAIT;
+ }
}
}
+
return result;
wait:
{
GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
- if (priv->eos)
- result = GST_FLOW_EOS;
- else
- result = GST_FLOW_WAIT;
- return result;
+ if (priv->eos) {
+ return GST_FLOW_EOS;
+ } else {
+ return GST_FLOW_WAIT;
+ }
}
}
return rtx_retry_period;
}
+/*
+ 1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th)
+ 2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th)
+ 3. For very large measurements (> avg * 2), consider them "outliers"
+ and count them a lot less (1/48th)
+*/
+static void
+update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt)
+{
+ gint weight;
+
+ if (priv->avg_rtx_rtt == 0) {
+ priv->avg_rtx_rtt = rtt;
+ return;
+ }
+
+ if (rtt > 2 * priv->avg_rtx_rtt)
+ weight = 48;
+ else if (rtt > priv->avg_rtx_rtt)
+ weight = 8;
+ else
+ weight = 16;
+
+ priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight;
+}
+
+static void
+update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime dts, gboolean success)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstClockTime delay;
+
+ if (success) {
+ /* we scheduled a retry for this packet and now we have it */
+ priv->num_rtx_success++;
+ /* all the previous retry attempts failed */
+ priv->num_rtx_failed += timer->num_rtx_retry - 1;
+ } else {
+ /* All retries failed or was too late */
+ priv->num_rtx_failed += timer->num_rtx_retry;
+ }
+
+ /* number of retries before (hopefully) receiving the packet */
+ if (priv->avg_rtx_num == 0.0)
+ priv->avg_rtx_num = timer->num_rtx_retry;
+ else
+ priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
+
+ /* Calculate the delay between retransmission request and receiving this
+ * packet. We have a valid delay if and only if this packet is a response to
+ * our last request. If not we don't know if this is a response to an
+ * earlier request and delay could be way off. For RTT is more important
+ * with correct values than to update for every packet. */
+ if (timer->num_rtx_retry == timer->num_rtx_received &&
+ dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
+ delay = dts - timer->rtx_last;
+ update_avg_rtx_rtt (priv, delay);
+ } else {
+ delay = 0;
+ }
+
+ GST_LOG_OBJECT (jitterbuffer,
+ "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
+ G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
+ G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
+ GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
+ priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
+ priv->avg_rtx_num, GST_TIME_ARGS (delay),
+ GST_TIME_ARGS (priv->avg_rtx_rtt));
+}
+
/* the timeout for when we expected a packet expired */
static gboolean
do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
GstEvent *event;
guint delay, delay_ms, avg_rtx_rtt_ms;
guint rtx_retry_timeout_ms, rtx_retry_period_ms;
+ guint rtx_deadline_ms;
GstClockTime rtx_retry_period;
GstClockTime rtx_retry_timeout;
GstClock *clock;
rtx_retry_timeout = get_rtx_retry_timeout (priv);
rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
- GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
- GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
- GST_TIME_ARGS (rtx_retry_period));
-
delay = timer->rtx_delay + timer->rtx_retry;
delay_ms = GST_TIME_AS_MSECONDS (delay);
rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
+ rtx_deadline_ms =
+ priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms;
event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstRTPRetransmissionRequest",
"retry", G_TYPE_UINT, timer->num_rtx_retry,
"frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
"period", G_TYPE_UINT, rtx_retry_period_ms,
- "deadline", G_TYPE_UINT, priv->latency_ms,
+ "deadline", G_TYPE_UINT, rtx_deadline_ms,
"packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
"avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
+ GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
priv->num_rtx_requests++;
timer->num_rtx_retry++;
GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
if ((priv->rtx_max_retries != -1
&& timer->num_rtx_retry >= priv->rtx_max_retries)
- || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
+ || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
+ || (timer->rtx_base + rtx_retry_period < now)) {
GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
/* too many retransmission request, we now convert the timer
* to a lost timer, leave the num_rtx_retry as it is for stats */
GstClockTime now)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration, timestamp;
guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
- gboolean late, head;
- GstEvent *event;
+ gboolean head;
+ GstEvent *event = NULL;
RTPJitterBufferItem *item;
seqnum = timer->seqnum;
- timestamp = apply_offset (jitterbuffer, timer->timeout);
- duration = timer->duration;
- if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
- duration = priv->packet_spacing;
lost_packets = MAX (timer->num, 1);
- late = timer->num > 0;
num_rtx_retry = timer->num_rtx_retry;
/* we had a gap and thus we lost some packets. Create an event for this. */
else
GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
- priv->num_late += lost_packets;
+ priv->num_lost += lost_packets;
priv->num_rtx_failed += num_rtx_retry;
next_in_seqnum = (seqnum + lost_packets) & 0xffff;
/* we now only accept seqnum bigger than this */
- if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
+ if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
priv->next_in_seqnum = next_in_seqnum;
+ priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
+ }
- /* create paket lost event */
- event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
- gst_structure_new ("GstRTPPacketLost",
- "seqnum", G_TYPE_UINT, (guint) seqnum,
- "timestamp", G_TYPE_UINT64, timestamp,
- "duration", G_TYPE_UINT64, duration,
- "late", G_TYPE_BOOLEAN, late,
- "retry", G_TYPE_UINT, num_rtx_retry, NULL));
-
+ /* Avoid creating events if we don't need it. Note that we still need to create
+ * the lost *ITEM* since it will be used to notify the outgoing thread of
+ * lost items (so that we can set discont flags and such) */
+ if (priv->do_lost) {
+ GstClockTime duration, timestamp;
+ /* create paket lost event */
+ timestamp = apply_offset (jitterbuffer, timer->timeout);
+ duration = timer->duration;
+ if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
+ duration = priv->packet_spacing;
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("GstRTPPacketLost",
+ "seqnum", G_TYPE_UINT, (guint) seqnum,
+ "timestamp", G_TYPE_UINT64, timestamp,
+ "duration", G_TYPE_UINT64, duration,
+ "retry", G_TYPE_UINT, num_rtx_retry, NULL));
+ }
item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
+ if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
+ /* Duplicate */
+ free_item (item);
- /* remove timer now */
+ if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
+ /* Store info to update stats if the packet arrives too late */
+ timer_queue_append (priv->rtx_stats_timers, timer,
+ now + priv->rtx_stats_timeout * GST_MSECOND, TRUE);
+ }
remove_timer (jitterbuffer, timer);
+
if (head)
JBUF_SIGNAL_EVENT (priv);
GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
remove_timer (jitterbuffer, timer);
if (!priv->eos) {
+ GstEvent *event;
+
/* there was no EOS in the buffer, put one in there now */
- queue_event (jitterbuffer, gst_event_new_eos ());
+ event = gst_event_new_eos ();
+ if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, priv->segment_seqnum);
+ queue_event (jitterbuffer, event);
}
JBUF_SIGNAL_EVENT (priv);
GstClockTime timer_timeout = -1;
gint i, len;
+ /* If we have a clock, update "now" now with the very
+ * latest running time we have. If timers are unscheduled below we
+ * otherwise wouldn't update now (it's only updated when timers
+ * expire), and also for the very first loop iteration now would
+ * otherwise always be 0
+ */
+ GST_OBJECT_LOCK (jitterbuffer);
+ if (priv->eos) {
+ now = GST_CLOCK_TIME_NONE;
+ } else if (GST_ELEMENT_CLOCK (jitterbuffer)) {
+ now =
+ gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
+ GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ }
+ GST_OBJECT_UNLOCK (jitterbuffer);
+
GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
GST_TIME_ARGS (now));
+ /* Clear expired rtx-stats timers */
+ if (priv->do_retransmission)
+ timer_queue_clear_until (priv->rtx_stats_timers, now);
+
+ /* Iterate "normal" timers */
len = priv->timers->len;
- for (i = 0; i < len; i++) {
+ for (i = 0; i < len;) {
TimerData *test = &g_array_index (priv->timers, TimerData, i);
GstClockTime test_timeout = get_timeout (jitterbuffer, test);
gboolean save_best = FALSE;
- GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
- i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
-
- /* find the smallest timeout */
- if (timer == NULL) {
- save_best = TRUE;
- } else if (timer_timeout == -1) {
- /* we already have an immediate timeout, the new timer must be an
- * immediate timer with smaller seqnum to become the best */
- if (test_timeout == -1
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
+ test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
+ GST_STIME_ARGS ((gint64) (test_timeout - now)));
+
+ /* Weed out anything too late */
+ if (test->type == TIMER_TYPE_LOST &&
+ (test_timeout == -1 || test_timeout <= now)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
+ do_lost_timeout (jitterbuffer, test, now);
+ if (!priv->timer_running)
+ break;
+ /* We don't move the iterator forward since we just removed the current entry,
+ * but we update the termination condition */
+ len = priv->timers->len;
+ } else {
+ /* find the smallest timeout */
+ if (timer == NULL) {
+ save_best = TRUE;
+ } else if (timer_timeout == -1) {
+ /* we already have an immediate timeout, the new timer must be an
+ * immediate timer with smaller seqnum to become the best */
+ if (test_timeout == -1
+ && (gst_rtp_buffer_compare_seqnum (test->seqnum,
+ timer->seqnum) > 0))
+ save_best = TRUE;
+ } else if (test_timeout == -1) {
+ /* first immediate timer */
+ save_best = TRUE;
+ } else if (test_timeout < timer_timeout) {
+ /* earlier timer */
+ save_best = TRUE;
+ } else if (test_timeout == timer_timeout
&& (gst_rtp_buffer_compare_seqnum (test->seqnum,
- timer->seqnum) > 0))
+ timer->seqnum) > 0)) {
+ /* same timer, smaller seqnum */
save_best = TRUE;
- } else if (test_timeout == -1) {
- /* first immediate timer */
- save_best = TRUE;
- } else if (test_timeout < timer_timeout) {
- /* earlier timer */
- save_best = TRUE;
- } else if (test_timeout == timer_timeout
- && (gst_rtp_buffer_compare_seqnum (test->seqnum,
- timer->seqnum) > 0)) {
- /* same timer, smaller seqnum */
- save_best = TRUE;
- }
- if (save_best) {
- GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
- timer = test;
- timer_timeout = test_timeout;
+ }
+
+ if (save_best) {
+ GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
+ timer = test;
+ timer_timeout = test_timeout;
+ }
+ i++;
}
}
if (timer && !priv->blocked) {
GstClockReturn ret;
GstClockTimeDiff clock_jitter;
- if (timer_timeout == -1 || timer_timeout <= now) {
+ if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
+ /* We have normally removed all lost timers in the loop above */
+ g_assert (timer->type != TIMER_TYPE_LOST);
+
do_timeout (jitterbuffer, timer, now);
/* check here, do_timeout could have released the lock */
if (!priv->timer_running)
if (ret != GST_CLOCK_UNSCHEDULED) {
now = timer_timeout + MAX (clock_jitter, 0);
- GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
- ret, priv->timer_seqnum, clock_jitter);
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
+ GST_STIME_ARGS (clock_jitter));
} else {
GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
}
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
+ JBUF_SIGNAL_QUEUE (priv);
if (G_LIKELY (result == GST_FLOW_WAIT)) {
/* now wait for the next event */
JBUF_WAIT_EVENT (priv, flushing);
gst_pad_pause_task (priv->srcpad);
if (result == GST_FLOW_EOS) {
event = gst_event_new_eos ();
+ if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, priv->segment_seqnum);
gst_pad_push_event (priv->srcpad, event);
}
return;
/* check how far ahead it is to our RTP timestamps */
diff = ext_rtptime - last_rtptime;
/* if bigger than 1 second, we drop it */
- if (diff > clock_rate) {
+ if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
+ diff >
+ gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
+ clock_rate, 1000)) {
GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
/* should drop this, but some RTSP servers end up with bogus
* way too ahead RTCP packet when repeated PAUSE/PLAY,
break;
case PROP_TS_OFFSET:
JBUF_LOCK (priv);
- priv->ts_offset = g_value_get_int64 (value);
+ if (priv->max_ts_offset_adjustment != 0) {
+ gint64 new_offset = g_value_get_int64 (value);
+
+ if (new_offset > priv->ts_offset) {
+ priv->ts_offset_remainder = new_offset - priv->ts_offset;
+ } else {
+ priv->ts_offset_remainder = -(priv->ts_offset - new_offset);
+ }
+ } else {
+ priv->ts_offset = g_value_get_int64 (value);
+ priv->ts_offset_remainder = 0;
+ }
priv->ts_discont = TRUE;
JBUF_UNLOCK (priv);
break;
+ case PROP_MAX_TS_OFFSET_ADJUSTMENT:
+ JBUF_LOCK (priv);
+ priv->max_ts_offset_adjustment = g_value_get_uint64 (value);
+ JBUF_UNLOCK (priv);
+ break;
case PROP_DO_LOST:
JBUF_LOCK (priv);
priv->do_lost = g_value_get_boolean (value);
priv->rtx_max_retries = g_value_get_int (value);
JBUF_UNLOCK (priv);
break;
+ case PROP_RTX_DEADLINE:
+ JBUF_LOCK (priv);
+ priv->rtx_deadline_ms = g_value_get_int (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_STATS_TIMEOUT:
+ JBUF_LOCK (priv);
+ priv->rtx_stats_timeout = g_value_get_uint (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_MAX_RTCP_RTP_TIME_DIFF:
+ JBUF_LOCK (priv);
+ priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_MAX_DROPOUT_TIME:
+ JBUF_LOCK (priv);
+ priv->max_dropout_time = g_value_get_uint (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_MAX_MISORDER_TIME:
+ JBUF_LOCK (priv);
+ priv->max_misorder_time = g_value_get_uint (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RFC7273_SYNC:
+ JBUF_LOCK (priv);
+ rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
+ g_value_get_boolean (value));
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_FASTSTART_MIN_PACKETS:
+ JBUF_LOCK (priv);
+ priv->faststart_min_packets = g_value_get_uint (value);
+ JBUF_UNLOCK (priv);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
g_value_set_int64 (value, priv->ts_offset);
JBUF_UNLOCK (priv);
break;
+ case PROP_MAX_TS_OFFSET_ADJUSTMENT:
+ JBUF_LOCK (priv);
+ g_value_set_uint64 (value, priv->max_ts_offset_adjustment);
+ JBUF_UNLOCK (priv);
+ break;
case PROP_DO_LOST:
JBUF_LOCK (priv);
g_value_set_boolean (value, priv->do_lost);
g_value_set_int (value, priv->rtx_max_retries);
JBUF_UNLOCK (priv);
break;
+ case PROP_RTX_DEADLINE:
+ JBUF_LOCK (priv);
+ g_value_set_int (value, priv->rtx_deadline_ms);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_STATS_TIMEOUT:
+ JBUF_LOCK (priv);
+ g_value_set_uint (value, priv->rtx_stats_timeout);
+ JBUF_UNLOCK (priv);
+ break;
case PROP_STATS:
g_value_take_boxed (value,
gst_rtp_jitter_buffer_create_stats (jitterbuffer));
break;
+ case PROP_MAX_RTCP_RTP_TIME_DIFF:
+ JBUF_LOCK (priv);
+ g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_MAX_DROPOUT_TIME:
+ JBUF_LOCK (priv);
+ g_value_set_uint (value, priv->max_dropout_time);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_MAX_MISORDER_TIME:
+ JBUF_LOCK (priv);
+ g_value_set_uint (value, priv->max_misorder_time);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RFC7273_SYNC:
+ JBUF_LOCK (priv);
+ g_value_set_boolean (value,
+ rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_FASTSTART_MIN_PACKETS:
+ JBUF_LOCK (priv);
+ g_value_set_uint (value, priv->faststart_min_packets);
+ JBUF_UNLOCK (priv);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
static GstStructure *
gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
{
+ GstRtpJitterBufferPrivate *priv = jbuf->priv;
GstStructure *s;
- JBUF_LOCK (jbuf->priv);
+ JBUF_LOCK (priv);
s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
- "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
- "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
- "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
- "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
- JBUF_UNLOCK (jbuf->priv);
+ "num-pushed", G_TYPE_UINT64, priv->num_pushed,
+ "num-lost", G_TYPE_UINT64, priv->num_lost,
+ "num-late", G_TYPE_UINT64, priv->num_late,
+ "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
+ "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
+ "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
+ "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
+ "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
+ "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
+ JBUF_UNLOCK (priv);
return s;
}