X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Fgstrtpjitterbuffer.c;h=e8b636f0dc8a1c29b2a0a103a1ecdfa86c23ccaf;hb=fee4cf452376ba5c54923cc2fe1d00804f48437d;hp=8f9c51f0600c3ef666640bf5974446d7006d8d9c;hpb=2663388000ddd56859914fc9930b7b0ed971354f;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 8f9c51f..e8b636f 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -5,6 +5,11 @@ * Copyright 2007 Nokia Corporation * @author: Philippe Kalaf . * Copyright 2007 Wim Taymans + * Copyright 2015 Kurento (http://kurento.org/) + * @author: Miguel París + * Copyright 2016 Pexip AS + * @author: Havard Graff + * @author: Stian Selnes * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -42,7 +47,7 @@ * depayloader or other element to create concealment data or some other logic * to gracefully handle the missing packets. * - * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming + * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing * buffer. * @@ -64,7 +69,7 @@ * * - If seqnum N arrived, all seqnum older than * N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late - * immediately. This is to request fast feedback for abonormally reorder + * immediately. This is to request fast feedback for abnormally reorder * packets before any of the previous timeouts is triggered. * * A late packet triggers the GstRTPRetransmissionRequest custom upstream @@ -98,8 +103,10 @@ #endif #include +#include #include #include +#include #include "gstrtpjitterbuffer.h" #include "rtpjitterbuffer.h" @@ -124,6 +131,7 @@ enum #define DEFAULT_LATENCY_MS 200 #define DEFAULT_DROP_ON_LATENCY FALSE #define DEFAULT_TS_OFFSET 0 +#define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0 #define DEFAULT_DO_LOST FALSE #define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE #define DEFAULT_PERCENT 0 @@ -136,6 +144,13 @@ enum #define DEFAULT_RTX_MIN_RETRY_TIMEOUT -1 #define DEFAULT_RTX_RETRY_PERIOD -1 #define DEFAULT_RTX_MAX_RETRIES -1 +#define DEFAULT_RTX_DEADLINE -1 +#define DEFAULT_RTX_STATS_TIMEOUT 1000 +#define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000 +#define DEFAULT_MAX_DROPOUT_TIME 60000 +#define DEFAULT_MAX_MISORDER_TIME 2000 +#define DEFAULT_RFC7273_SYNC FALSE +#define DEFAULT_FASTSTART_MIN_PACKETS 0 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND) #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND) @@ -146,6 +161,7 @@ enum PROP_LATENCY, PROP_DROP_ON_LATENCY, PROP_TS_OFFSET, + PROP_MAX_TS_OFFSET_ADJUSTMENT, PROP_DO_LOST, PROP_MODE, PROP_PERCENT, @@ -158,28 +174,56 @@ enum PROP_RTX_MIN_RETRY_TIMEOUT, PROP_RTX_RETRY_PERIOD, PROP_RTX_MAX_RETRIES, - PROP_STATS + PROP_RTX_DEADLINE, + PROP_RTX_STATS_TIMEOUT, + PROP_STATS, + PROP_MAX_RTCP_RTP_TIME_DIFF, + PROP_MAX_DROPOUT_TIME, + PROP_MAX_MISORDER_TIME, + PROP_RFC7273_SYNC, + PROP_FASTSTART_MIN_PACKETS }; -#define JBUF_LOCK(priv) (g_mutex_lock (&(priv)->jbuf_lock)) +#define JBUF_LOCK(priv) G_STMT_START { \ + GST_TRACE("Locking from thread %p", g_thread_self()); \ + (g_mutex_lock (&(priv)->jbuf_lock)); \ + GST_TRACE("Locked from thread %p", g_thread_self()); \ + } G_STMT_END #define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \ JBUF_LOCK (priv); \ if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \ goto label; \ } G_STMT_END -#define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock)) +#define JBUF_UNLOCK(priv) G_STMT_START { \ + GST_TRACE ("Unlocking from thread %p", g_thread_self ()); \ + (g_mutex_unlock (&(priv)->jbuf_lock)); \ +} G_STMT_END + +#define JBUF_WAIT_QUEUE(priv) G_STMT_START { \ + GST_DEBUG ("waiting queue"); \ + (priv)->waiting_queue++; \ + g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \ + (priv)->waiting_queue--; \ + GST_DEBUG ("waiting queue done"); \ +} G_STMT_END +#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \ + if (G_UNLIKELY ((priv)->waiting_queue)) { \ + GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \ + g_cond_signal (&(priv)->jbuf_queue); \ + } \ +} G_STMT_END #define JBUF_WAIT_TIMER(priv) G_STMT_START { \ GST_DEBUG ("waiting timer"); \ - (priv)->waiting_timer = TRUE; \ + (priv)->waiting_timer++; \ g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \ - (priv)->waiting_timer = FALSE; \ + (priv)->waiting_timer--; \ GST_DEBUG ("waiting timer done"); \ } G_STMT_END #define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \ if (G_UNLIKELY ((priv)->waiting_timer)) { \ - GST_DEBUG ("signal timer"); \ + GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \ g_cond_signal (&(priv)->jbuf_timer); \ } \ } G_STMT_END @@ -217,6 +261,14 @@ enum } \ } G_STMT_END +#define GST_BUFFER_IS_RETRANSMISSION(buffer) \ + GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION) + +typedef struct TimerQueue +{ + GQueue *timers; + GHashTable *hashtable; +} TimerQueue; struct _GstRtpJitterBufferPrivate { @@ -225,6 +277,8 @@ struct _GstRtpJitterBufferPrivate RTPJitterBuffer *jbuf; GMutex jbuf_lock; + gboolean waiting_queue; + GCond jbuf_queue; gboolean waiting_timer; GCond jbuf_timer; gboolean waiting_event; @@ -236,6 +290,7 @@ struct _GstRtpJitterBufferPrivate gboolean ts_discont; gboolean active; guint64 out_offset; + guint32 segment_seqnum; gboolean timer_running; GThread *timer_thread; @@ -245,6 +300,7 @@ struct _GstRtpJitterBufferPrivate guint64 latency_ns; gboolean drop_on_latency; gint64 ts_offset; + guint64 max_ts_offset_adjustment; gboolean do_lost; gboolean do_retransmission; gboolean rtx_next_seqnum; @@ -255,6 +311,12 @@ struct _GstRtpJitterBufferPrivate gint rtx_min_retry_timeout; gint rtx_retry_period; gint rtx_max_retries; + guint rtx_stats_timeout; + gint rtx_deadline_ms; + gint max_rtcp_rtp_time_diff; + guint32 max_dropout_time; + guint32 max_misorder_time; + guint faststart_min_packets; /* the last seqnum we pushed out */ guint32 last_popped_seqnum; @@ -265,18 +327,19 @@ struct _GstRtpJitterBufferPrivate /* last output time */ GstClockTime last_out_time; /* last valid input timestamp and rtptime pair */ - GstClockTime ips_dts; + GstClockTime ips_pts; guint64 ips_rtptime; GstClockTime packet_spacing; + gint equidistant; GQueue gap_packets; /* the next expected seqnum we receive */ - GstClockTime last_in_dts; - guint32 last_in_seqnum; + GstClockTime last_in_pts; guint32 next_in_seqnum; GArray *timers; + TimerQueue *rtx_stats_timers; /* start and stop ranges */ GstClockTime npt_start; @@ -294,7 +357,7 @@ struct _GstRtpJitterBufferPrivate gint last_pt; gint32 clock_rate; gint64 clock_base; - gint64 prev_ts_offset; + gint64 ts_offset_remainder; /* when we are shutting down */ GstFlowReturn srcresult; @@ -312,6 +375,8 @@ struct _GstRtpJitterBufferPrivate GstBuffer *last_sr; /* some accounting */ + guint64 num_pushed; + guint64 num_lost; guint64 num_late; guint64 num_duplicates; guint64 num_rtx_requests; @@ -319,9 +384,11 @@ struct _GstRtpJitterBufferPrivate guint64 num_rtx_failed; gdouble avg_rtx_num; guint64 avg_rtx_rtt; + RTPPacketRateCtx packet_rate_ctx; /* for the jitter */ GstClockTime last_dts; + GstClockTime last_pts; guint64 last_rtptime; GstClockTime avg_jitter; }; @@ -347,12 +414,9 @@ typedef struct GstClockTime rtx_retry; GstClockTime rtx_last; guint num_rtx_retry; + guint num_rtx_received; } TimerData; -#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \ - GstRtpJitterBufferPrivate)) - static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK, @@ -385,7 +449,8 @@ GST_STATIC_PAD_TEMPLATE ("src", static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 }; #define gst_rtp_jitter_buffer_parent_class parent_class -G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT); +G_DEFINE_TYPE_WITH_PRIVATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, + GST_TYPE_ELEMENT); /* object overrides */ static void gst_rtp_jitter_buffer_set_property (GObject * object, @@ -402,6 +467,8 @@ static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element, static void gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad); static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element); +static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element, + GstClock * clock); /* pad overrides */ static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter); @@ -413,6 +480,8 @@ static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent, GstEvent * event); static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer); +static GstFlowReturn gst_rtp_jitter_buffer_chain_list (GstPad * pad, + GstObject * parent, GstBufferList * buffer_list); static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent, GstEvent * event); @@ -446,6 +515,12 @@ static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer); static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jitterbuffer); +static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, + TimerData * timer, GstClockTime dts, gboolean success); + +static TimerQueue *timer_queue_new (void); +static void timer_queue_free (TimerQueue * queue); + static void gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) { @@ -455,8 +530,6 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) gobject_class = (GObjectClass *) klass; gstelement_class = (GstElementClass *) klass; - g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate)); - gobject_class->finalize = gst_rtp_jitter_buffer_finalize; gobject_class->set_property = gst_rtp_jitter_buffer_set_property; @@ -495,6 +568,20 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** + * GstRtpJitterBuffer:max-ts-offset-adjustment: + * + * The maximum number of nanoseconds per frame that time offset may be + * adjusted with. This is used to avoid sudden large changes to time stamps. + */ + g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT, + g_param_spec_uint64 ("max-ts-offset-adjustment", + "Max Timestamp Offset Adjustment", + "The maximum number of nanoseconds per frame that time stamp " + "offsets may be adjusted (0 = no limit).", 0, G_MAXUINT64, + DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE | + G_PARAM_STATIC_STRINGS)); + + /** * GstRtpJitterBuffer:do-lost: * * Send out a GstRTPPacketLost event downstream when a packet is considered @@ -590,13 +677,15 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) * this much packet reordering. * * When -1 is used, the value will be estimated based on observed packet - * reordering. + * reordering. When 0 is used packet reordering alone will not cause a + * retransmission event (Since 1.10). * * Since: 1.2 */ g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER, g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder", - "Sending retransmission event when this much reordering (-1 automatic)", + "Sending retransmission event when this much reordering " + "(0 disable)", -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** @@ -663,6 +752,47 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** + * GstRtpJitterBuffer:rtx-deadline: + * + * The deadline for a valid RTX request in ms. + * + * How long the RTX RTCP will be valid for. + * When -1 is used, the size of the jitterbuffer will be used. + * + * Since: 1.10 + */ + g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE, + g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)", + "The deadline for a valid RTX request in milliseconds. " + "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +/** + * GstRtpJitterBuffer::rtx-stats-timeout: + * + * The time to wait for a retransmitted packet after it has been + * considered lost in order to collect RTX statistics. + * + * Since: 1.10 + */ + g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT, + g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout", + "The time to wait for a retransmitted packet after it has been " + "considered lost in order to collect statistics (ms)", + 0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME, + g_param_spec_uint ("max-dropout-time", "Max dropout time", + "The maximum time (milliseconds) of missing packets tolerated.", + 0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME, + g_param_spec_uint ("max-misorder-time", "Max misorder time", + "The maximum time (milliseconds) of misordered packets tolerated.", + 0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** * GstRtpJitterBuffer:stats: * * Various jitterbuffer statistics. This property returns a GstStructure @@ -672,6 +802,34 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) * * * #guint64 + * "num-pushed": + * the number of packets pushed out. + * + * + * + * + * #guint64 + * "num-lost": + * the number of packets considered lost. + * + * + * + * + * #guint64 + * "num-late": + * the number of packets arriving too late. + * + * + * + * + * #guint64 + * "num-duplicates": + * the number of duplicate packets. + * + * + * + * + * #guint64 * "rtx-count": * the number of retransmissions requested. * @@ -707,6 +865,45 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); /** + * GstRtpJitterBuffer:max-rtcp-rtp-time-diff + * + * The maximum amount of time in ms that the RTP time in the RTCP SRs + * is allowed to be ahead of the last RTP packet we received. Use + * -1 to disable ignoring of RTCP packets. + * + * Since: 1.8 + */ + g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF, + g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff", + "Maximum amount of time in ms that the RTP time in RTCP SRs " + "is allowed to be ahead (-1 disabled)", -1, G_MAXINT, + DEFAULT_MAX_RTCP_RTP_TIME_DIFF, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC, + g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock", + "Synchronize received streams to the RFC7273 clock " + "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstRtpJitterBuffer:faststart-min-packets + * + * The number of consecutive packets needed to start (set to 0 to + * disable faststart. The jitterbuffer will by default start after the + * latency has elapsed) + * + * Since: 1.14 + */ + g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS, + g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets", + "The number of consecutive packets needed to start (set to 0 to " + "disable faststart. The jitterbuffer will by default start after " + "the latency has elapsed)", + 0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** * GstRtpJitterBuffer::request-pt-map: * @buffer: the object which received the signal * @pt: the pt @@ -781,13 +978,15 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad); gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock); + gstelement_class->set_clock = + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template)); + gst_element_class_add_static_pad_template (gstelement_class, + &gst_rtp_jitter_buffer_src_template); + gst_element_class_add_static_pad_template (gstelement_class, + &gst_rtp_jitter_buffer_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &gst_rtp_jitter_buffer_sink_rtcp_template); gst_element_class_set_static_metadata (gstelement_class, "RTP packet jitter-buffer", "Filter/Network/RTP", @@ -800,6 +999,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass) GST_DEBUG_CATEGORY_INIT (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer"); + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp); } static void @@ -807,12 +1007,14 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv; - priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer); + priv = gst_rtp_jitter_buffer_get_instance_private (jitterbuffer); jitterbuffer->priv = priv; priv->latency_ms = DEFAULT_LATENCY_MS; priv->latency_ns = priv->latency_ms * GST_MSECOND; priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY; + priv->ts_offset = DEFAULT_TS_OFFSET; + priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT; priv->do_lost = DEFAULT_DO_LOST; priv->do_retransmission = DEFAULT_DO_RETRANSMISSION; priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM; @@ -823,17 +1025,29 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT; priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD; priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES; - + priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE; + priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT; + priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF; + priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME; + priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME; + priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS; + + priv->ts_offset_remainder = 0; priv->last_dts = -1; + priv->last_pts = -1; priv->last_rtptime = -1; priv->avg_jitter = 0; + priv->segment_seqnum = GST_SEQNUM_INVALID; priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData)); + priv->rtx_stats_timers = timer_queue_new (); priv->jbuf = rtp_jitter_buffer_new (); g_mutex_init (&priv->jbuf_lock); + g_cond_init (&priv->jbuf_queue); g_cond_init (&priv->jbuf_timer); g_cond_init (&priv->jbuf_event); g_cond_init (&priv->jbuf_query); g_queue_init (&priv->gap_packets); + gst_segment_init (&priv->segment, GST_FORMAT_TIME); /* reset skew detection initialy */ rtp_jitter_buffer_reset_skew (priv->jbuf); @@ -858,6 +1072,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer) gst_pad_set_chain_function (priv->sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain)); + gst_pad_set_chain_list_function (priv->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain_list)); gst_pad_set_event_function (priv->sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event)); gst_pad_set_query_function (priv->sinkpad, @@ -930,7 +1146,9 @@ gst_rtp_jitter_buffer_finalize (GObject * object) priv = jitterbuffer->priv; g_array_free (priv->timers, TRUE); + timer_queue_free (priv->rtx_stats_timers); g_mutex_clear (&priv->jbuf_lock); + g_cond_clear (&priv->jbuf_queue); g_cond_clear (&priv->jbuf_timer); g_cond_clear (&priv->jbuf_event); g_cond_clear (&priv->jbuf_query); @@ -1087,6 +1305,16 @@ gst_rtp_jitter_buffer_provide_clock (GstElement * element) return gst_system_clock_obtain (); } +static gboolean +gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock) +{ + GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element); + + rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock); + + return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock); +} + static void gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer) { @@ -1133,7 +1361,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active, } if ((item = rtp_jitter_buffer_peek (priv->jbuf))) { /* head buffer timestamp and offset gives our output time */ - last_out = item->dts + priv->ts_offset; + last_out = item->pts + priv->ts_offset; } else { /* use last known time when the buffer is empty */ last_out = priv->last_out_time; @@ -1185,19 +1413,33 @@ gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter) static gboolean gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, - GstCaps * caps) + GstCaps * caps, gint pt) { GstRtpJitterBufferPrivate *priv; GstStructure *caps_struct; guint val; + gint payload = -1; GstClockTime tval; + const gchar *ts_refclk, *mediaclk; priv = jitterbuffer->priv; /* first parse the caps */ caps_struct = gst_caps_get_structure (caps, 0); - GST_DEBUG_OBJECT (jitterbuffer, "got caps"); + GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps); + + if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1 + && payload != pt) { + GST_ERROR_OBJECT (jitterbuffer, + "Got caps with wrong payload type (got %d, expected %d)", pt, payload); + return FALSE; + } + + if (payload != -1) { + GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload); + priv->last_pt = payload; + } /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to * measure the amount of data in the buffer */ @@ -1211,6 +1453,8 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate); + gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate); + /* The clock base is the RTP timestamp corrsponding to the npt-start value. We * can use this to track the amount of time elapsed on the sender. */ if (gst_structure_get_uint (caps_struct, "clock-base", &val)) @@ -1255,6 +1499,75 @@ gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer, "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT, GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop)); + if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) { + GstClock *clock = NULL; + guint64 clock_offset = -1; + + GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s", + ts_refclk); + + if (g_str_has_prefix (ts_refclk, "ntp=")) { + if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) { + GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks"); + } else { + const gchar *host, *portstr; + gchar *hostname; + guint port; + + host = ts_refclk + sizeof ("ntp=") - 1; + if (host[0] == '[') { + /* IPv6 */ + portstr = strchr (host, ']'); + if (portstr && portstr[1] == ':') + portstr = portstr + 1; + else + portstr = NULL; + } else { + portstr = strrchr (host, ':'); + } + + + if (!portstr || sscanf (portstr, ":%u", &port) != 1) + port = 123; + + if (portstr) + hostname = g_strndup (host, (portstr - host)); + else + hostname = g_strdup (host); + + clock = gst_ntp_clock_new (NULL, hostname, port, 0); + g_free (hostname); + } + } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) { + const gchar *domainstr = + ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1; + guint domain; + + if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1) + domain = 0; + + clock = gst_ptp_clock_new (NULL, domain); + } else { + GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock"); + } + + if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) { + GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk); + + if (!g_str_has_prefix (mediaclk, "direct=") + || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1) + GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock"); + if (strstr (mediaclk, "rate=") != NULL) { + GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported"); + clock_offset = -1; + } + } + + rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset); + } else { + rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1); + } + return TRUE; /* ERRORS */ @@ -1284,6 +1597,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer) /* this unblocks any waiting pops on the src pad task */ JBUF_SIGNAL_EVENT (priv); JBUF_SIGNAL_QUERY (priv, FALSE); + JBUF_SIGNAL_QUEUE (priv); JBUF_UNLOCK (priv); } @@ -1300,11 +1614,11 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer) priv->srcresult = GST_FLOW_OK; gst_segment_init (&priv->segment, GST_FORMAT_TIME); priv->last_popped_seqnum = -1; - priv->last_out_time = -1; + priv->last_out_time = GST_CLOCK_TIME_NONE; priv->next_seqnum = -1; priv->seqnum_base = -1; priv->ips_rtptime = -1; - priv->ips_dts = GST_CLOCK_TIME_NONE; + priv->ips_pts = GST_CLOCK_TIME_NONE; priv->packet_spacing = 0; priv->next_in_seqnum = -1; priv->clock_rate = -1; @@ -1316,6 +1630,9 @@ gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer) priv->avg_jitter = 0; priv->last_dts = -1; priv->last_rtptime = -1; + priv->last_in_pts = 0; + priv->equidistant = 0; + priv->segment_seqnum = GST_SEQNUM_INVALID; GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL); rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE); @@ -1386,6 +1703,7 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, /* block until we go to PLAYING */ priv->blocked = TRUE; priv->timer_running = TRUE; + priv->srcresult = GST_FLOW_OK; priv->timer_thread = g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer); JBUF_UNLOCK (priv); @@ -1424,9 +1742,11 @@ gst_rtp_jitter_buffer_change_state (GstElement * element, JBUF_LOCK (priv); gst_buffer_replace (&priv->last_sr, NULL); priv->timer_running = FALSE; + priv->srcresult = GST_FLOW_FLUSHING; unschedule_current_timer (jitterbuffer); JBUF_SIGNAL_TIMER (priv); JBUF_SIGNAL_QUERY (priv, FALSE); + JBUF_SIGNAL_QUEUE (priv); JBUF_UNLOCK (priv); g_thread_join (priv->timer_thread); priv->timer_thread = NULL; @@ -1499,19 +1819,29 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event) GstCaps *caps; gst_event_parse_caps (event, &caps); - gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps); + gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1); break; } case GST_EVENT_SEGMENT: - gst_event_copy_segment (event, &priv->segment); + { + GstSegment segment; + gst_event_copy_segment (event, &segment); + + priv->segment_seqnum = gst_event_get_seqnum (event); /* we need time for now */ - if (priv->segment.format != GST_FORMAT_TIME) - goto newseg_wrong_format; + if (segment.format != GST_FORMAT_TIME) { + GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment"); + gst_event_unref (event); - GST_DEBUG_OBJECT (jitterbuffer, - "segment: %" GST_SEGMENT_FORMAT, &priv->segment); + gst_segment_init (&segment, GST_FORMAT_TIME); + event = gst_event_new_segment (&segment); + gst_event_set_seqnum (event, priv->segment_seqnum); + } + + priv->segment = segment; break; + } case GST_EVENT_EOS: priv->eos = TRUE; rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE); @@ -1524,18 +1854,10 @@ queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event) GST_DEBUG_OBJECT (jitterbuffer, "adding event"); item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); - if (head) + if (head || priv->eos) JBUF_SIGNAL_EVENT (priv); return TRUE; - - /* ERRORS */ -newseg_wrong_format: - { - GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment"); - gst_event_unref (event); - return FALSE; - } } static gboolean @@ -1671,7 +1993,7 @@ gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer, if (!caps) goto no_caps; - res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps); + res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt); gst_caps_unref (caps); if (G_UNLIKELY (!res)) @@ -1718,6 +2040,32 @@ check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent) return message; } +static void +update_offset (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + + priv = jitterbuffer->priv; + + if (priv->ts_offset_remainder != 0) { + GST_DEBUG ("adjustment %" G_GUINT64_FORMAT " remain %" G_GINT64_FORMAT + " off %" G_GINT64_FORMAT, priv->max_ts_offset_adjustment, + priv->ts_offset_remainder, priv->ts_offset); + if (ABS (priv->ts_offset_remainder) > priv->max_ts_offset_adjustment) { + if (priv->ts_offset_remainder > 0) { + priv->ts_offset += priv->max_ts_offset_adjustment; + priv->ts_offset_remainder -= priv->max_ts_offset_adjustment; + } else { + priv->ts_offset -= priv->max_ts_offset_adjustment; + priv->ts_offset_remainder += priv->max_ts_offset_adjustment; + } + } else { + priv->ts_offset += priv->ts_offset_remainder; + priv->ts_offset_remainder = 0; + } + } +} + static GstClockTime apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp) { @@ -1736,8 +2084,70 @@ apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp) return timestamp; } +static TimerQueue * +timer_queue_new (void) +{ + TimerQueue *queue; + + queue = g_slice_new (TimerQueue); + queue->timers = g_queue_new (); + queue->hashtable = g_hash_table_new (NULL, NULL); + + return queue; +} + +static void +timer_queue_free (TimerQueue * queue) +{ + if (!queue) + return; + + g_hash_table_destroy (queue->hashtable); + g_queue_free_full (queue->timers, g_free); + g_slice_free (TimerQueue, queue); +} + +static void +timer_queue_append (TimerQueue * queue, const TimerData * timer, + GstClockTime timeout, gboolean lost) +{ + TimerData *copy; + + copy = g_memdup (timer, sizeof (*timer)); + copy->timeout = timeout; + copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED; + copy->idx = -1; + + GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT, + copy->seqnum, GST_TIME_ARGS (copy->timeout)); + g_queue_push_tail (queue->timers, copy); + g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy); +} + +static void +timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout) +{ + TimerData *test; + + test = g_queue_peek_head (queue->timers); + while (test && test->timeout < timeout) { + GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %" + GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout), + GST_TIME_ARGS (timeout)); + g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum)); + g_free (g_queue_pop_head (queue->timers)); + test = g_queue_peek_head (queue->timers); + } +} + +static TimerData * +timer_queue_find (TimerQueue * queue, guint16 seqnum) +{ + return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum)); +} + static TimerData * -find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum) +find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; TimerData *timer = NULL; @@ -1746,7 +2156,7 @@ find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum) len = priv->timers->len; for (i = 0; i < len; i++) { TimerData *test = &g_array_index (priv->timers, TimerData, i); - if (test->seqnum == seqnum && test->type == type) { + if (test->seqnum == seqnum) { timer = test; break; } @@ -1827,7 +2237,9 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, timer->rtx_delay = delay; timer->rtx_retry = 0; } + timer->rtx_last = GST_CLOCK_TIME_NONE; timer->num_rtx_retry = 0; + timer->num_rtx_received = 0; recalculate_timer (jitterbuffer, timer); JBUF_SIGNAL_TIMER (priv); @@ -1841,28 +2253,39 @@ reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; gboolean seqchange, timechange; guint16 oldseq; + GstClockTime new_timeout; - seqchange = timer->seqnum != seqnum; - timechange = timer->timeout != timeout; + oldseq = timer->seqnum; + new_timeout = timeout + delay; + seqchange = oldseq != seqnum; + timechange = timer->timeout != new_timeout; - if (!seqchange && !timechange) + if (!seqchange && !timechange) { + GST_DEBUG_OBJECT (jitterbuffer, + "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT + "), skipping", oldseq, GST_TIME_ARGS (timer->timeout)); return; - - oldseq = timer->seqnum; + } GST_DEBUG_OBJECT (jitterbuffer, - "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT, - oldseq, seqnum, GST_TIME_ARGS (timeout + delay)); + "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT + "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum, + GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout)); - timer->timeout = timeout + delay; + timer->timeout = new_timeout; timer->seqnum = seqnum; if (reset) { + GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT + "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay), + GST_TIME_ARGS (delay)); timer->rtx_base = timeout; timer->rtx_delay = delay; timer->rtx_retry = 0; } - if (seqchange) + if (seqchange) { timer->num_rtx_retry = 0; + timer->num_rtx_received = 0; + } if (priv->clock_id) { /* we changed the seqnum and there is a timer currently waiting with this @@ -1883,7 +2306,7 @@ set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, TimerData *timer; /* find the seqnum timer */ - timer = find_timer (jitterbuffer, type, seqnum); + timer = find_timer (jitterbuffer, seqnum); if (timer == NULL) { timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1); } else { @@ -1898,6 +2321,9 @@ remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; guint idx; + if (timer->idx == -1) + return; + if (priv->clock_id && priv->timer_seqnum == timer->seqnum) unschedule_current_timer (jitterbuffer); @@ -1905,6 +2331,8 @@ remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx); g_array_remove_index_fast (priv->timers, idx); timer->idx = idx; + + JBUF_SIGNAL_TIMER (priv); } static void @@ -1914,6 +2342,7 @@ remove_all_timers (GstRtpJitterBuffer * jitterbuffer) GST_DEBUG_OBJECT (jitterbuffer, "removed all timers"); g_array_set_size (priv->timers, 0); unschedule_current_timer (jitterbuffer); + JBUF_SIGNAL_TIMER (priv); } /* get the extra delay to wait before sending RTX */ @@ -1939,6 +2368,30 @@ get_rtx_delay (GstRtpJitterBufferPrivate * priv) return delay; } +/* Check if packet with seqnum is already considered definitely lost by being + * part of a "lost timer" for multiple packets */ +static gboolean +already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + gint i, len; + + len = priv->timers->len; + for (i = 0; i < len; i++) { + TimerData *test = &g_array_index (priv->timers, TimerData, i); + gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum); + + if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 && + gap < test->num) { + GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)", + seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff); + return TRUE; + } + } + + return FALSE; +} + /* we just received a packet with seqnum and dts. * * First check for old seqnum that we are still expecting. If the gap with the @@ -1951,37 +2404,30 @@ get_rtx_delay (GstRtpJitterBufferPrivate * priv) */ static void update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, - GstClockTime dts, gboolean do_next_seqnum) + GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum, + gboolean is_rtx, TimerData * timer) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - TimerData *timer = NULL; - gint i, len; - /* go through all timers and unschedule the ones with a large gap, also find - * the timer for the seqnum */ - len = priv->timers->len; - for (i = 0; i < len; i++) { - TimerData *test = &g_array_index (priv->timers, TimerData, i); - gint gap; + /* go through all timers and unschedule the ones with a large gap */ + if (priv->do_retransmission && priv->rtx_delay_reorder > 0) { + gint i, len; + len = priv->timers->len; + for (i = 0; i < len; i++) { + TimerData *test = &g_array_index (priv->timers, TimerData, i); + gint gap; - gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum); + gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum); - GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i, - test->type, test->seqnum, seqnum, gap); + GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", + test->type, test->seqnum, seqnum, gap); - if (gap == 0) { - GST_DEBUG ("found timer for current seqnum"); - /* the timer for the current seqnum */ - timer = test; - /* when no retransmission, we can stop now, we only need to find the - * timer for the current seqnum */ - if (!priv->do_retransmission) - break; - } else if (gap > priv->rtx_delay_reorder) { - /* max gap, we exceeded the max reorder distance and we don't expect the - * missing packet to be this reordered */ - if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED) - reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE); + if (gap > priv->rtx_delay_reorder) { + /* max gap, we exceeded the max reorder distance and we don't expect the + * missing packet to be this reordered */ + if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED) + reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE); + } } } @@ -1990,61 +2436,49 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, if (timer && timer->type != TIMER_TYPE_DEADLINE) { if (timer->num_rtx_retry > 0) { - GstClockTime rtx_last, delay; - - /* we scheduled a retry for this packet and now we have it */ - priv->num_rtx_success++; - /* all the previous retry attempts failed */ - priv->num_rtx_failed += timer->num_rtx_retry - 1; - /* number of retries before receiving the packet */ - if (priv->avg_rtx_num == 0.0) - priv->avg_rtx_num = timer->num_rtx_retry; - else - priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8; - /* calculate the delay between retransmission request and receiving this - * packet, start with when we scheduled this timeout last */ - rtx_last = timer->rtx_last; - if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) { - /* we have a valid delay if this packet arrived after we scheduled the - * request */ - delay = dts - rtx_last; - if (priv->avg_rtx_rtt == 0) - priv->avg_rtx_rtt = delay; - else - priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8; - } else - delay = 0; - - GST_LOG_OBJECT (jitterbuffer, - "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT - ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT - ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT, - priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests, - priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay), - GST_TIME_ARGS (priv->avg_rtx_rtt)); - - /* don't try to estimate the next seqnum because this is a retransmitted - * packet and it probably did not arrive with the expected packet - * spacing. */ - do_next_seqnum = FALSE; + if (is_rtx) { + update_rtx_stats (jitterbuffer, timer, dts, TRUE); + /* don't try to estimate the next seqnum because this is a retransmitted + * packet and it probably did not arrive with the expected packet + * spacing. */ + do_next_seqnum = FALSE; + } + + if (!is_rtx || timer->num_rtx_retry > 1) { + /* Store timer in order to record stats when/if the retransmitted + * packet arrives. We should also store timer information if we've + * requested retransmission more than once since we may receive + * several retransmitted packets. For accuracy we should update the + * stats also when the redundant retransmitted packets arrives. */ + timer_queue_append (priv->rtx_stats_timers, timer, + pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE); + } } } - if (do_next_seqnum) { + if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) { GstClockTime expected, delay; /* calculate expected arrival time of the next seqnum */ - expected = dts + priv->packet_spacing; + expected = pts + priv->packet_spacing; delay = get_rtx_delay (priv); /* and update/install timer for next seqnum */ - if (timer) + GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, expected %" + GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", packet-spacing %" + GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum, + GST_TIME_ARGS (expected), GST_TIME_ARGS (delay), + GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter)); + + if (timer) { + timer->type = TIMER_TYPE_EXPECTED; reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected, delay, TRUE); - else + } else { add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0, expected, delay, priv->packet_spacing); + } } else if (timer && timer->type != TIMER_TYPE_DEADLINE) { /* if we had a timer, remove it, we don't know when to expect the next * packet. */ @@ -2054,16 +2488,16 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum, static void calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime, - GstClockTime dts) + GstClockTime pts) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; /* we need consecutive seqnums with a different * rtptime to estimate the packet spacing. */ if (priv->ips_rtptime != rtptime) { - /* rtptime changed, check dts diff */ - if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) { - GstClockTime new_packet_spacing = dts - priv->ips_dts; + /* rtptime changed, check pts diff */ + if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) { + GstClockTime new_packet_spacing = pts - priv->ips_pts; GstClockTime old_packet_spacing = priv->packet_spacing; /* Biased towards bigger packet spacings to prevent @@ -2088,98 +2522,122 @@ calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime, GST_TIME_ARGS (priv->packet_spacing)); } priv->ips_rtptime = rtptime; - priv->ips_dts = dts; + priv->ips_pts = pts; } } static void calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, - guint16 seqnum, GstClockTime dts, gint gap) + guint16 seqnum, GstClockTime pts, gint gap) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstClockTime total_duration, duration, expected_dts; + GstClockTime duration, expected_pts, delay; TimerType type; - guint lost_packets = 0; + gboolean equidistant = priv->equidistant > 0; GST_DEBUG_OBJECT (jitterbuffer, - "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, - GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts)); + "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, + GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts)); - /* the total duration spanned by the missing packets */ - if (dts >= priv->last_in_dts) - total_duration = dts - priv->last_in_dts; - else - total_duration = 0; - - /* interpolate between the current time and the last time based on - * number of packets we are missing, this is the estimated duration - * for the missing packet based on equidistant packet spacing. */ - duration = total_duration / (gap + 1); + if (pts == GST_CLOCK_TIME_NONE) { + GST_WARNING_OBJECT (jitterbuffer, "Have no PTS"); + return; + } - GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT, - GST_TIME_ARGS (duration)); + if (equidistant) { + GstClockTime total_duration; + /* the total duration spanned by the missing packets */ + if (pts >= priv->last_in_pts) + total_duration = pts - priv->last_in_pts; + else + total_duration = 0; - if (total_duration > priv->latency_ns) { - GstClockTime gap_time; + /* interpolate between the current time and the last time based on + * number of packets we are missing, this is the estimated duration + * for the missing packet based on equidistant packet spacing. */ + duration = total_duration / (gap + 1); - gap_time = total_duration - priv->latency_ns; + GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT, + GST_TIME_ARGS (duration)); - if (duration > 0) { - lost_packets = gap_time / duration; - gap_time = lost_packets * duration; - } else { - lost_packets = gap; - } + if (total_duration > priv->latency_ns) { + GstClockTime gap_time; + guint lost_packets; - /* too many lost packets, some of the missing packets are already - * too late and we can generate lost packet events for them. */ - GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT - " > %" GST_TIME_FORMAT ", consider %u lost", - GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns), - lost_packets); + if (duration > 0) { + GstClockTime gap_dur = gap * duration; + if (gap_dur > priv->latency_ns) + gap_time = gap_dur - priv->latency_ns; + else + gap_time = 0; + lost_packets = gap_time / duration; + } else { + gap_time = total_duration - priv->latency_ns; + lost_packets = gap; + } - /* this timer will fire immediately and the lost event will be pushed from - * the timer thread */ - add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets, - priv->last_in_dts + duration, 0, gap_time); + /* too many lost packets, some of the missing packets are already + * too late and we can generate lost packet events for them. */ + GST_INFO_OBJECT (jitterbuffer, + "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")", + gap, expected, seqnum - 1, GST_TIME_ARGS (total_duration), + GST_TIME_ARGS (priv->latency_ns), lost_packets, + GST_TIME_ARGS (gap_time)); + + /* this timer will fire immediately and the lost event will be pushed from + * the timer thread */ + if (lost_packets > 0) { + add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets, + priv->last_in_pts + duration, 0, gap_time); + expected += lost_packets; + priv->last_in_pts += gap_time; + } + } - expected += lost_packets; - priv->last_in_dts += gap_time; + expected_pts = priv->last_in_pts + duration; + } else { + /* If we cannot assume equidistant packet spacing, the only thing we now + * for sure is that the missing packets have expected pts not later than + * the last received pts. */ + duration = 0; + expected_pts = pts; } - expected_dts = priv->last_in_dts + (lost_packets + 1) * duration; + delay = 0; if (priv->do_retransmission) { - TimerData *timer; + TimerData *timer = find_timer (jitterbuffer, expected); type = TIMER_TYPE_EXPECTED; + delay = get_rtx_delay (priv); + /* if we had a timer for the first missing packet, update it. */ - if ((timer = find_timer (jitterbuffer, type, expected))) { + if (timer && timer->type == TIMER_TYPE_EXPECTED) { GstClockTime timeout = timer->timeout; timer->duration = duration; - if (timeout > (expected_dts + timer->rtx_retry)) { - GstClockTime delay = timeout - expected_dts - timer->rtx_retry; - reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts, + if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) { + reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts, delay, TRUE); } expected++; - expected_dts += duration; + expected_pts += duration; } } else { type = TIMER_TYPE_LOST; } while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) { - add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration); - expected_dts += duration; + add_timer (jitterbuffer, type, expected, 0, expected_pts, delay, duration); + expected_pts += duration; expected++; } } static void calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts, - guint rtptime) + guint32 rtptime) { gint32 rtpdiff; GstClockTimeDiff dtsdiff, rtpdiffns, diff; @@ -2200,6 +2658,15 @@ calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts, else rtpdiff = 0; + /* Guess whether stream currently uses equidistant packet spacing. If we + * often see identical timestamps it means the packets are not + * equidistant. */ + if (rtptime == priv->last_rtptime) + priv->equidistant -= 2; + else + priv->equidistant += 1; + priv->equidistant = CLAMP (priv->equidistant, -7, 7); + priv->last_dts = dts; priv->last_rtptime = rtptime; @@ -2251,12 +2718,13 @@ compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data) } static gboolean -handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future, - GstBuffer * buffer, guint8 pt, guint16 seqnum, gint gap) +handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer, + guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder) { GstRtpJitterBufferPrivate *priv; guint gap_packets_length; gboolean reset = FALSE; + gboolean future = gap > 0; priv = jitterbuffer->priv; @@ -2294,7 +2762,7 @@ handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future, GST_DEBUG_OBJECT (jitterbuffer, "buffer too %s %d < %d, got 5 consecutive ones - reset", (future ? "new" : "old"), gap, - (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER)); + (future ? max_dropout : -max_misorder)); reset = TRUE; } else if (!all_consecutive) { g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL); @@ -2302,20 +2770,19 @@ handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future, GST_DEBUG_OBJECT (jitterbuffer, "buffer too %s %d < %d, got no 5 consecutive ones - dropping", (future ? "new" : "old"), gap, - (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER)); + (future ? max_dropout : -max_misorder)); buffer = NULL; } else { GST_DEBUG_OBJECT (jitterbuffer, "buffer too %s %d < %d, got %u consecutive ones - waiting", (future ? "new" : "old"), gap, - (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER), - gap_packets_length + 1); + (future ? max_dropout : -max_misorder), gap_packets_length + 1); buffer = NULL; } } else { GST_DEBUG_OBJECT (jitterbuffer, "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"), - gap, -RTP_MAX_MISORDER); + gap, -max_misorder); g_queue_push_tail (&priv->gap_packets, buffer); buffer = NULL; } @@ -2323,6 +2790,131 @@ handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future, return reset; } +static GstClockTime +get_current_running_time (GstRtpJitterBuffer * jitterbuffer) +{ + GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer)); + GstClockTime running_time = GST_CLOCK_TIME_NONE; + + if (clock) { + GstClockTime base_time = + gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)); + GstClockTime clock_time = gst_clock_get_time (clock); + + if (clock_time > base_time) + running_time = clock_time - base_time; + else + running_time = 0; + + gst_object_unref (clock); + } + + return running_time; +} + +static GstFlowReturn +gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer, + GstPad * pad, GstObject * parent, guint16 seqnum) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GstFlowReturn ret = GST_FLOW_OK; + GList *events = NULL, *l; + GList *buffers; + gboolean head; + + GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); + rtp_jitter_buffer_flush (priv->jbuf, + (GFunc) free_item_and_retain_events, &events); + rtp_jitter_buffer_reset_skew (priv->jbuf); + remove_all_timers (jitterbuffer); + priv->discont = TRUE; + priv->last_popped_seqnum = -1; + + if (priv->gap_packets.head) { + GstBuffer *gap_buffer = priv->gap_packets.head->data; + GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT; + + gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp); + priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp); + gst_rtp_buffer_unmap (&gap_rtp); + } else { + priv->next_seqnum = seqnum; + } + + priv->last_in_pts = -1; + priv->next_in_seqnum = -1; + + /* Insert all sticky events again in order, otherwise we would + * potentially loose STREAM_START, CAPS or SEGMENT events + */ + events = g_list_reverse (events); + for (l = events; l; l = l->next) { + RTPJitterBufferItem *item; + + item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); + rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + } + g_list_free (events); + + JBUF_SIGNAL_EVENT (priv); + + /* reset spacing estimation when gap */ + priv->ips_rtptime = -1; + priv->ips_pts = GST_CLOCK_TIME_NONE; + + buffers = g_list_copy (priv->gap_packets.head); + g_queue_clear (&priv->gap_packets); + + priv->ips_rtptime = -1; + priv->ips_pts = GST_CLOCK_TIME_NONE; + JBUF_UNLOCK (jitterbuffer->priv); + + for (l = buffers; l; l = l->next) { + ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data); + l->data = NULL; + if (ret != GST_FLOW_OK) { + l = l->next; + break; + } + } + for (; l; l = l->next) + gst_buffer_unref (l->data); + g_list_free (buffers); + + return ret; +} + +static gboolean +gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer) +{ + GstRtpJitterBufferPrivate *priv; + RTPJitterBufferItem *item; + TimerData *timer; + + priv = jitterbuffer->priv; + + if (priv->faststart_min_packets == 0) + return FALSE; + + item = rtp_jitter_buffer_peek (priv->jbuf); + if (!item) + return FALSE; + + timer = find_timer (jitterbuffer, item->seqnum); + if (!timer || timer->type != TIMER_TYPE_DEADLINE) + return FALSE; + + if (rtp_jitter_buffer_can_fast_start (priv->jbuf, + priv->faststart_min_packets)) { + GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now", + priv->faststart_min_packets); + timer->timeout = -1; + return TRUE; + } + + return FALSE; +} + static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) @@ -2341,6 +2933,9 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, gboolean do_next_seqnum = FALSE; RTPJitterBufferItem *item; GstMessage *msg = NULL; + gboolean estimated_dts = FALSE; + gint32 packet_rate, max_dropout, max_misorder; + TimerData *timer = NULL; jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent); @@ -2362,16 +2957,31 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, else if (pts == -1) pts = dts; - /* take the DTS of the buffer. This is the time when the packet was - * received and is used to calculate jitter and clock skew. We will adjust - * this DTS with the smoothed value after processing it in the - * jitterbuffer and assign it as the PTS. */ - /* bring to running time */ - dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts); + if (dts == -1) { + /* If we have no DTS here, i.e. no capture time, get one from the + * clock now to have something to calculate with in the future. */ + dts = get_current_running_time (jitterbuffer); + pts = dts; + + /* Remember that we estimated the DTS if we are running already + * and this is not our first packet (or first packet after a reset). + * If it's the first packet, we somehow must generate a timestamp for + * everything, otherwise we can't calculate any times + */ + estimated_dts = (priv->next_in_seqnum != -1); + } else { + /* take the DTS of the buffer. This is the time when the packet was + * received and is used to calculate jitter and clock skew. We will adjust + * this DTS with the smoothed value after processing it in the + * jitterbuffer and assign it as the PTS. */ + /* bring to running time */ + dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts); + } GST_DEBUG_OBJECT (jitterbuffer, - "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum, - GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer)); + "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d", + seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer), + GST_BUFFER_IS_RETRANSMISSION (buffer)); JBUF_LOCK_CHECK (priv, out_flushing); @@ -2388,7 +2998,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, /* Try to get the clock-rate from the caps first if we can. If there are no * caps we must fire the signal to get the clock-rate. */ if ((caps = gst_pad_get_current_caps (pad))) { - gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps); + gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt); gst_caps_unref (caps); } } @@ -2401,13 +3011,16 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, if (G_UNLIKELY (priv->clock_rate == -1)) goto no_clock_rate; + + gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate); } /* don't accept more data on EOS */ if (G_UNLIKELY (priv->eos)) goto have_eos; - calculate_jitter (jitterbuffer, dts, rtptime); + if (!GST_BUFFER_IS_RETRANSMISSION (buffer)) + calculate_jitter (jitterbuffer, dts, rtptime); if (priv->seqnum_base != -1) { gint gap; @@ -2419,7 +3032,6 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, "packet seqnum #%d before seqnum-base #%d", seqnum, priv->seqnum_base); gst_buffer_unref (buffer); - ret = GST_FLOW_OK; goto finished; } else if (gap > 16384) { /* From now on don't compare against the seqnum base anymore as @@ -2431,141 +3043,128 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, expected = priv->next_in_seqnum; + packet_rate = + gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime); + max_dropout = + gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx, + priv->max_dropout_time); + max_misorder = + gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx, + priv->max_misorder_time); + GST_TRACE_OBJECT (jitterbuffer, + "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate, + max_dropout, max_misorder); + /* now check against our expected seqnum */ - if (G_LIKELY (expected != -1)) { - gint gap; + if (G_UNLIKELY (expected == -1)) { + GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum); + + /* calculate a pts based on rtptime and arrival time (dts) */ + pts = + rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts, + rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer))); + + /* we don't know what the next_in_seqnum should be, wait for the last + * possible moment to push this buffer, maybe we get an earlier seqnum + * while we wait */ + set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts); + + do_next_seqnum = TRUE; + /* take rtptime and pts to calculate packet spacing */ + priv->ips_rtptime = rtptime; + priv->ips_pts = pts; + } else { + gint gap; /* now calculate gap */ gap = gst_rtp_buffer_compare_seqnum (expected, seqnum); - GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d", expected, seqnum, gap); - if (G_LIKELY (gap == 0)) { - /* packet is expected */ - calculate_packet_spacing (jitterbuffer, rtptime, dts); - do_next_seqnum = TRUE; - } else { - gboolean reset = FALSE; - - if (!GST_CLOCK_TIME_IS_VALID (dts)) { - /* We would run into calculations with GST_CLOCK_TIME_NONE below - * and can't compensate for anything without DTS on RTP packets - */ - goto gap_but_no_dts; - } else if (gap < 0) { - /* we received an old packet */ - if (G_UNLIKELY (gap != -1 && gap < -RTP_MAX_MISORDER)) { - reset = - handle_big_gap_buffer (jitterbuffer, FALSE, buffer, pt, seqnum, - gap); - buffer = NULL; - } else { - GST_DEBUG_OBJECT (jitterbuffer, "old packet received"); - } - } else { - /* new packet, we are missing some packets */ - if (G_UNLIKELY (gap >= RTP_MAX_DROPOUT)) { - reset = - handle_big_gap_buffer (jitterbuffer, TRUE, buffer, pt, seqnum, - gap); - buffer = NULL; - } else { - GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap); - /* fill in the gap with EXPECTED timers */ - calculate_expected (jitterbuffer, expected, seqnum, dts, gap); + if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) { + /* If we have timers for more than RTP_MAX_DROPOUT packets + * pending this means that we have a huge gap overall. We can + * reset the jitterbuffer at this point because there's + * just too much data missing to be able to do anything + * sensible with the past data. Just try again from the + * next packet */ + GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting", + priv->timers->len, max_dropout); + gst_buffer_unref (buffer); + return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum); + } - do_next_seqnum = TRUE; - } + /* Special handling of large gaps */ + if ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout)) { + gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum, + gap, max_dropout, max_misorder); + if (reset) { + return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum); + } else { + GST_DEBUG_OBJECT (jitterbuffer, + "Had big gap, waiting for more consecutive packets"); + goto finished; } - if (G_UNLIKELY (reset)) { - GList *events = NULL, *l; - GList *buffers; - - GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer"); - rtp_jitter_buffer_flush (priv->jbuf, - (GFunc) free_item_and_retain_events, &events); - rtp_jitter_buffer_reset_skew (priv->jbuf); - remove_all_timers (jitterbuffer); - priv->discont = TRUE; - priv->last_popped_seqnum = -1; - priv->next_seqnum = seqnum; - - priv->last_in_seqnum = -1; - priv->last_in_dts = -1; - priv->next_in_seqnum = -1; - - /* Insert all sticky events again in order, otherwise we would - * potentially loose STREAM_START, CAPS or SEGMENT events - */ - events = g_list_reverse (events); - for (l = events; l; l = l->next) { - RTPJitterBufferItem *item; - - item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); - } - g_list_free (events); - - JBUF_SIGNAL_EVENT (priv); - - /* reset spacing estimation when gap */ - priv->ips_rtptime = -1; - priv->ips_dts = GST_CLOCK_TIME_NONE; + } - buffers = g_list_copy (priv->gap_packets.head); - g_queue_clear (&priv->gap_packets); + /* We had no huge gap, let's drop all the gap packets */ + GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets"); + g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL); + g_queue_clear (&priv->gap_packets); - priv->ips_rtptime = -1; - priv->ips_dts = GST_CLOCK_TIME_NONE; - JBUF_UNLOCK (jitterbuffer->priv); + /* calculate a pts based on rtptime and arrival time (dts) */ + /* If we estimated the DTS, don't consider it in the clock skew calculations */ + pts = + rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts, + rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer))); - for (l = buffers; l; l = l->next) { - ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data); - l->data = NULL; - if (ret != GST_FLOW_OK) - break; - } - for (; l; l = l->next) - gst_buffer_unref (l->data); - g_list_free (buffers); + if (G_LIKELY (gap == 0)) { + /* packet is expected */ + calculate_packet_spacing (jitterbuffer, rtptime, pts); + do_next_seqnum = TRUE; + } else { - return ret; + /* we have a gap */ + if (gap > 0) { + GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap); + /* fill in the gap with EXPECTED timers */ + calculate_expected (jitterbuffer, expected, seqnum, pts, gap); + do_next_seqnum = TRUE; + } else { + GST_DEBUG_OBJECT (jitterbuffer, "old packet received"); + do_next_seqnum = FALSE; } + /* reset spacing estimation when gap */ priv->ips_rtptime = -1; - priv->ips_dts = GST_CLOCK_TIME_NONE; + priv->ips_pts = GST_CLOCK_TIME_NONE; } - } else { - GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum); - /* we don't know what the next_in_seqnum should be, wait for the last - * possible moment to push this buffer, maybe we get an earlier seqnum - * while we wait */ - set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts); - do_next_seqnum = TRUE; - /* take rtptime and dts to calculate packet spacing */ - priv->ips_rtptime = rtptime; - priv->ips_dts = dts; - } - - /* We had no huge gap, let's drop all the gap packets */ - if (buffer != NULL) { - GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets"); - g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL); - g_queue_clear (&priv->gap_packets); - } else { - GST_DEBUG_OBJECT (jitterbuffer, - "Had big gap, waiting for more consecutive packets"); - JBUF_UNLOCK (jitterbuffer->priv); - return GST_FLOW_OK; } if (do_next_seqnum) { - priv->last_in_seqnum = seqnum; - priv->last_in_dts = dts; + priv->last_in_pts = pts; priv->next_in_seqnum = (seqnum + 1) & 0xffff; } + timer = find_timer (jitterbuffer, seqnum); + if (GST_BUFFER_IS_RETRANSMISSION (buffer)) { + if (!timer) + timer = timer_queue_find (priv->rtx_stats_timers, seqnum); + if (timer) + timer->num_rtx_received++; + } + + /* At 2^15, we would detect a seqnum rollover too early, therefore + * limit the queue size. But let's not limit it to a number that is + * too small to avoid emptying it needlessly if there is a spurious huge + * sequence number, let's allow at least 10k packets in any case. */ + while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 && + rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 && + priv->srcresult == GST_FLOW_OK) + JBUF_WAIT_QUEUE (priv); + if (priv->srcresult != GST_FLOW_OK) + goto out_flushing; + /* let's check if this buffer is too late, we can only accept packets with * bigger seqnum than the one we last pushed. */ if (G_LIKELY (priv->last_popped_seqnum != -1)) { @@ -2574,10 +3173,24 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum); /* priv->last_popped_seqnum >= seqnum, we're too late. */ - if (G_UNLIKELY (gap <= 0)) + if (G_UNLIKELY (gap <= 0)) { + if (priv->do_retransmission) { + if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) { + update_rtx_stats (jitterbuffer, timer, dts, FALSE); + /* Only count the retranmitted packet too late if it has been + * considered lost. If the original packet arrived before the + * retransmitted we just count it as a duplicate. */ + if (timer->type != TIMER_TYPE_LOST) + goto rtx_duplicate; + } + } goto too_late; + } } + if (already_lost (jitterbuffer, seqnum)) + goto already_lost; + /* let's drop oldest packet if the queue is already full and drop-on-latency * is set. We can only do this when there actually is a latency. When no * latency is set, we just pump it in the queue and let the other end push it @@ -2595,7 +3208,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent); GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p", old_item); - priv->next_seqnum = (old_item->seqnum + 1) & 0xffff; + priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff; free_item (old_item); } /* we might have removed some head buffers, signal the pushing thread to @@ -2604,17 +3217,34 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, } } - item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime); + /* If we estimated the DTS, don't consider it in the clock skew calculations + * later. The code above always sets dts to pts or the other way around if + * any of those is valid in the buffer, so we know that if we estimated the + * dts that both are unknown */ + if (estimated_dts) + item = + alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE, + pts, seqnum, 1, rtptime); + else + item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime); /* now insert the packet into the queue in sorted order. This function returns * FALSE if a packet with the same seqnum was already in the queue, meaning we * have a duplicate. */ - if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, - &head, &percent))) + if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, + &percent))) { + if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) + update_rtx_stats (jitterbuffer, timer, dts, FALSE); goto duplicate; + } + + /* Trigger fast start if needed */ + if (gst_rtp_jitter_buffer_fast_start (jitterbuffer)) + head = TRUE; /* update timers */ - update_timers (jitterbuffer, seqnum, dts, do_next_seqnum); + update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum, + GST_BUFFER_IS_RETRANSMISSION (buffer), timer); /* we had an unhandled SR, handle it now */ if (priv->last_sr) @@ -2679,31 +3309,63 @@ have_eos: } too_late: { - GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already" + GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already" " popped, dropping", seqnum, priv->last_popped_seqnum); priv->num_late++; gst_buffer_unref (buffer); goto finished; } +already_lost: + { + GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already " + "considered lost", seqnum); + priv->num_late++; + gst_buffer_unref (buffer); + goto finished; + } duplicate: { - GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", + GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping", seqnum); priv->num_duplicates++; free_item (item); goto finished; } -gap_but_no_dts: +rtx_duplicate: { - /* this is fatal as we can't compensate for gaps without DTS */ - GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL), - ("Received packet without DTS after a gap")); + GST_DEBUG_OBJECT (jitterbuffer, + "Duplicate RTX packet #%d detected, dropping", seqnum); + priv->num_duplicates++; gst_buffer_unref (buffer); - ret = GST_FLOW_ERROR; goto finished; } } +/* FIXME: hopefully we can do something more efficient here, especially when + * all packets are in order and/or outside of the currently cached range. + * Still worthwhile to have it, avoids taking/releasing object lock and pad + * stream lock for every single buffer in the default chain_list fallback. */ +static GstFlowReturn +gst_rtp_jitter_buffer_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list) +{ + GstFlowReturn flow_ret = GST_FLOW_OK; + guint i, n; + + n = gst_buffer_list_length (buffer_list); + for (i = 0; i < n; ++i) { + GstBuffer *buf = gst_buffer_list_get (buffer_list, i); + + flow_ret = gst_rtp_jitter_buffer_chain (pad, parent, gst_buffer_ref (buf)); + + if (flow_ret != GST_FLOW_OK) + break; + } + gst_buffer_list_unref (buffer_list); + + return flow_ret; +} + static GstClockTime compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item) { @@ -2775,7 +3437,7 @@ update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, } /* this is the current time as running-time */ - out_time = item->dts; + out_time = item->pts; if (elapsed > 0) estimated = gst_util_uint64_scale (out_time, total, elapsed); @@ -2835,9 +3497,16 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) } dts = - gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts); + gst_segment_position_from_running_time (&priv->segment, + GST_FORMAT_TIME, item->dts); pts = - gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts); + gst_segment_position_from_running_time (&priv->segment, + GST_FORMAT_TIME, item->pts); + + /* if this is a new frame, check if ts_offset needs to be updated */ + if (pts != priv->last_pts) { + update_offset (jitterbuffer); + } /* apply timestamp with offset to buffer now */ GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts); @@ -2846,6 +3515,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) /* update the elapsed time when we need to check against the npt stop time. */ update_estimated_eos (jitterbuffer, item); + priv->last_pts = pts; priv->last_out_time = GST_BUFFER_PTS (outbuf); break; case ITEM_TYPE_LOST: @@ -2868,6 +3538,17 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) priv->next_seqnum = (seqnum + item->count) & 0xffff; } msg = check_buffering_percent (jitterbuffer, percent); + + if (type == ITEM_TYPE_EVENT && outevent && + GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) { + g_assert (priv->eos); + while (priv->timers->len > 0) { + /* Stopping timers */ + unschedule_current_timer (jitterbuffer); + JBUF_WAIT_TIMER (priv); + } + } + JBUF_UNLOCK (priv); item->data = NULL; @@ -2883,6 +3564,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)), GST_TIME_ARGS (GST_BUFFER_PTS (outbuf))); + priv->num_pushed++; result = gst_pad_push (priv->srcpad, outbuf); JBUF_LOCK_CHECK (priv, out_flushing); @@ -2891,7 +3573,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) case ITEM_TYPE_EVENT: /* We got not enough consecutive packets with a huge gap, we can * as well just drop them here now on EOS */ - if (GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) { + if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) { GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS"); g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL); g_queue_clear (&priv->gap_packets); @@ -2902,7 +3584,7 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) if (do_push) gst_pad_push_event (priv->srcpad, outevent); - else + else if (outevent) gst_event_unref (outevent); result = GST_FLOW_OK; @@ -2941,30 +3623,31 @@ static GstFlowReturn handle_next_buffer (GstRtpJitterBuffer * jitterbuffer) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstFlowReturn result = GST_FLOW_OK; + GstFlowReturn result; RTPJitterBufferItem *item; guint seqnum; guint32 next_seqnum; - gint gap; /* only push buffers when PLAYING and active and not buffering */ if (priv->blocked || !priv->active || - rtp_jitter_buffer_is_buffering (priv->jbuf)) + rtp_jitter_buffer_is_buffering (priv->jbuf)) { return GST_FLOW_WAIT; + } -again: /* peek a buffer, we're just looking at the sequence number. * If all is fine, we'll pop and push it. If the sequence number is wrong we * wait for a timeout or something to change. * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */ item = rtp_jitter_buffer_peek (priv->jbuf); - if (item == NULL) + if (item == NULL) { goto wait; + } /* get the seqnum and the next expected seqnum */ seqnum = item->seqnum; - if (seqnum == -1) - goto do_push; + if (seqnum == -1) { + return pop_and_push_next (jitterbuffer, seqnum); + } next_seqnum = priv->next_seqnum; @@ -2977,41 +3660,45 @@ again: * fires, so wait for that */ result = GST_FLOW_WAIT; } else { - /* else calculate GAP */ - gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum); + gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum); if (G_LIKELY (gap == 0)) { - do_push: /* no missing packet, pop and push */ result = pop_and_push_next (jitterbuffer, seqnum); } else if (G_UNLIKELY (gap < 0)) { - RTPJitterBufferItem *item; /* if we have a packet that we already pushed or considered dropped, pop it * off and get the next packet */ GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping", seqnum, next_seqnum); item = rtp_jitter_buffer_pop (priv->jbuf, NULL); free_item (item); - goto again; + result = GST_FLOW_OK; } else { /* the chain function has scheduled timers to request retransmission or * when to consider the packet lost, wait for that */ GST_DEBUG_OBJECT (jitterbuffer, "Sequence number GAP detected: expected %d instead of %d (%d missing)", next_seqnum, seqnum, gap); - result = GST_FLOW_WAIT; + /* if we have reached EOS, just keep processing */ + if (priv->eos) { + result = pop_and_push_next (jitterbuffer, seqnum); + result = GST_FLOW_OK; + } else { + result = GST_FLOW_WAIT; + } } } + return result; wait: { GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait"); - if (priv->eos) - result = GST_FLOW_EOS; - else - result = GST_FLOW_WAIT; - return result; + if (priv->eos) { + return GST_FLOW_EOS; + } else { + return GST_FLOW_WAIT; + } } } @@ -3063,6 +3750,78 @@ get_rtx_retry_period (GstRtpJitterBufferPrivate * priv, return rtx_retry_period; } +/* + 1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th) + 2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th) + 3. For very large measurements (> avg * 2), consider them "outliers" + and count them a lot less (1/48th) +*/ +static void +update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt) +{ + gint weight; + + if (priv->avg_rtx_rtt == 0) { + priv->avg_rtx_rtt = rtt; + return; + } + + if (rtt > 2 * priv->avg_rtx_rtt) + weight = 48; + else if (rtt > priv->avg_rtx_rtt) + weight = 8; + else + weight = 16; + + priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight; +} + +static void +update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, + GstClockTime dts, gboolean success) +{ + GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; + GstClockTime delay; + + if (success) { + /* we scheduled a retry for this packet and now we have it */ + priv->num_rtx_success++; + /* all the previous retry attempts failed */ + priv->num_rtx_failed += timer->num_rtx_retry - 1; + } else { + /* All retries failed or was too late */ + priv->num_rtx_failed += timer->num_rtx_retry; + } + + /* number of retries before (hopefully) receiving the packet */ + if (priv->avg_rtx_num == 0.0) + priv->avg_rtx_num = timer->num_rtx_retry; + else + priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8; + + /* Calculate the delay between retransmission request and receiving this + * packet. We have a valid delay if and only if this packet is a response to + * our last request. If not we don't know if this is a response to an + * earlier request and delay could be way off. For RTT is more important + * with correct values than to update for every packet. */ + if (timer->num_rtx_retry == timer->num_rtx_received && + dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) { + delay = dts - timer->rtx_last; + update_avg_rtx_rtt (priv, delay); + } else { + delay = 0; + } + + GST_LOG_OBJECT (jitterbuffer, + "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %" + G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %" + G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" + GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success, + priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates, + priv->avg_rtx_num, GST_TIME_ARGS (delay), + GST_TIME_ARGS (priv->avg_rtx_rtt)); +} + /* the timeout for when we expected a packet expired */ static gboolean do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, @@ -3072,6 +3831,7 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, GstEvent *event; guint delay, delay_ms, avg_rtx_rtt_ms; guint rtx_retry_timeout_ms, rtx_retry_period_ms; + guint rtx_deadline_ms; GstClockTime rtx_retry_period; GstClockTime rtx_retry_timeout; GstClock *clock; @@ -3082,16 +3842,14 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, rtx_retry_timeout = get_rtx_retry_timeout (priv); rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout); - GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %" - GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout), - GST_TIME_ARGS (rtx_retry_period)); - delay = timer->rtx_delay + timer->rtx_retry; delay_ms = GST_TIME_AS_MSECONDS (delay); rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout); rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period); avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt); + rtx_deadline_ms = + priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms; event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, gst_structure_new ("GstRTPRetransmissionRequest", @@ -3101,9 +3859,10 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, "retry", G_TYPE_UINT, timer->num_rtx_retry, "frequency", G_TYPE_UINT, rtx_retry_timeout_ms, "period", G_TYPE_UINT, rtx_retry_period_ms, - "deadline", G_TYPE_UINT, priv->latency_ms, + "deadline", G_TYPE_UINT, rtx_deadline_ms, "packet-spacing", G_TYPE_UINT64, priv->packet_spacing, "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL)); + GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event); priv->num_rtx_requests++; timer->num_rtx_retry++; @@ -3125,7 +3884,8 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry); if ((priv->rtx_max_retries != -1 && timer->num_rtx_retry >= priv->rtx_max_retries) - || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) { + || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period) + || (timer->rtx_base + rtx_retry_period < now)) { GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer"); /* too many retransmission request, we now convert the timer * to a lost timer, leave the num_rtx_retry as it is for stats */ @@ -3149,19 +3909,13 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, GstClockTime now) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstClockTime duration, timestamp; guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum; - gboolean late, head; - GstEvent *event; + gboolean head; + GstEvent *event = NULL; RTPJitterBufferItem *item; seqnum = timer->seqnum; - timestamp = apply_offset (jitterbuffer, timer->timeout); - duration = timer->duration; - if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0) - duration = priv->packet_spacing; lost_packets = MAX (timer->num, 1); - late = timer->num > 0; num_rtx_retry = timer->num_rtx_retry; /* we had a gap and thus we lost some packets. Create an event for this. */ @@ -3171,29 +3925,46 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, else GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum); - priv->num_late += lost_packets; + priv->num_lost += lost_packets; priv->num_rtx_failed += num_rtx_retry; next_in_seqnum = (seqnum + lost_packets) & 0xffff; /* we now only accept seqnum bigger than this */ - if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) + if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) { priv->next_in_seqnum = next_in_seqnum; + priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout); + } - /* create paket lost event */ - event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, - gst_structure_new ("GstRTPPacketLost", - "seqnum", G_TYPE_UINT, (guint) seqnum, - "timestamp", G_TYPE_UINT64, timestamp, - "duration", G_TYPE_UINT64, duration, - "late", G_TYPE_BOOLEAN, late, - "retry", G_TYPE_UINT, num_rtx_retry, NULL)); - + /* Avoid creating events if we don't need it. Note that we still need to create + * the lost *ITEM* since it will be used to notify the outgoing thread of + * lost items (so that we can set discont flags and such) */ + if (priv->do_lost) { + GstClockTime duration, timestamp; + /* create paket lost event */ + timestamp = apply_offset (jitterbuffer, timer->timeout); + duration = timer->duration; + if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0) + duration = priv->packet_spacing; + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, + gst_structure_new ("GstRTPPacketLost", + "seqnum", G_TYPE_UINT, (guint) seqnum, + "timestamp", G_TYPE_UINT64, timestamp, + "duration", G_TYPE_UINT64, duration, + "retry", G_TYPE_UINT, num_rtx_retry, NULL)); + } item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1); - rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL); + if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL)) + /* Duplicate */ + free_item (item); - /* remove timer now */ + if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) { + /* Store info to update stats if the packet arrives too late */ + timer_queue_append (priv->rtx_stats_timers, timer, + now + priv->rtx_stats_timeout * GST_MSECOND, TRUE); + } remove_timer (jitterbuffer, timer); + if (head) JBUF_SIGNAL_EVENT (priv); @@ -3209,8 +3980,13 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer, GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout"); remove_timer (jitterbuffer, timer); if (!priv->eos) { + GstEvent *event; + /* there was no EOS in the buffer, put one in there now */ - queue_event (jitterbuffer, gst_event_new_eos ()); + event = gst_event_new_eos (); + if (priv->segment_seqnum != GST_SEQNUM_INVALID) + gst_event_set_seqnum (event, priv->segment_seqnum); + queue_event (jitterbuffer, event); } JBUF_SIGNAL_EVENT (priv); @@ -3277,44 +4053,81 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) GstClockTime timer_timeout = -1; gint i, len; + /* If we have a clock, update "now" now with the very + * latest running time we have. If timers are unscheduled below we + * otherwise wouldn't update now (it's only updated when timers + * expire), and also for the very first loop iteration now would + * otherwise always be 0 + */ + GST_OBJECT_LOCK (jitterbuffer); + if (priv->eos) { + now = GST_CLOCK_TIME_NONE; + } else if (GST_ELEMENT_CLOCK (jitterbuffer)) { + now = + gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) - + GST_ELEMENT_CAST (jitterbuffer)->base_time; + } + GST_OBJECT_UNLOCK (jitterbuffer); + GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT, GST_TIME_ARGS (now)); + /* Clear expired rtx-stats timers */ + if (priv->do_retransmission) + timer_queue_clear_until (priv->rtx_stats_timers, now); + + /* Iterate "normal" timers */ len = priv->timers->len; - for (i = 0; i < len; i++) { + for (i = 0; i < len;) { TimerData *test = &g_array_index (priv->timers, TimerData, i); GstClockTime test_timeout = get_timeout (jitterbuffer, test); gboolean save_best = FALSE; - GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT, - i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout)); - - /* find the smallest timeout */ - if (timer == NULL) { - save_best = TRUE; - } else if (timer_timeout == -1) { - /* we already have an immediate timeout, the new timer must be an - * immediate timer with smaller seqnum to become the best */ - if (test_timeout == -1 + GST_DEBUG_OBJECT (jitterbuffer, + "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i, + test->type, test->seqnum, GST_TIME_ARGS (test_timeout), + GST_STIME_ARGS ((gint64) (test_timeout - now))); + + /* Weed out anything too late */ + if (test->type == TIMER_TYPE_LOST && + (test_timeout == -1 || test_timeout <= now)) { + GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry"); + do_lost_timeout (jitterbuffer, test, now); + if (!priv->timer_running) + break; + /* We don't move the iterator forward since we just removed the current entry, + * but we update the termination condition */ + len = priv->timers->len; + } else { + /* find the smallest timeout */ + if (timer == NULL) { + save_best = TRUE; + } else if (timer_timeout == -1) { + /* we already have an immediate timeout, the new timer must be an + * immediate timer with smaller seqnum to become the best */ + if (test_timeout == -1 + && (gst_rtp_buffer_compare_seqnum (test->seqnum, + timer->seqnum) > 0)) + save_best = TRUE; + } else if (test_timeout == -1) { + /* first immediate timer */ + save_best = TRUE; + } else if (test_timeout < timer_timeout) { + /* earlier timer */ + save_best = TRUE; + } else if (test_timeout == timer_timeout && (gst_rtp_buffer_compare_seqnum (test->seqnum, - timer->seqnum) > 0)) + timer->seqnum) > 0)) { + /* same timer, smaller seqnum */ save_best = TRUE; - } else if (test_timeout == -1) { - /* first immediate timer */ - save_best = TRUE; - } else if (test_timeout < timer_timeout) { - /* earlier timer */ - save_best = TRUE; - } else if (test_timeout == timer_timeout - && (gst_rtp_buffer_compare_seqnum (test->seqnum, - timer->seqnum) > 0)) { - /* same timer, smaller seqnum */ - save_best = TRUE; - } - if (save_best) { - GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i); - timer = test; - timer_timeout = test_timeout; + } + + if (save_best) { + GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i); + timer = test; + timer_timeout = test_timeout; + } + i++; } } if (timer && !priv->blocked) { @@ -3324,7 +4137,10 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) GstClockReturn ret; GstClockTimeDiff clock_jitter; - if (timer_timeout == -1 || timer_timeout <= now) { + if (timer_timeout == -1 || timer_timeout <= now || priv->eos) { + /* We have normally removed all lost timers in the loop above */ + g_assert (timer->type != TIMER_TYPE_LOST); + do_timeout (jitterbuffer, timer, now); /* check here, do_timeout could have released the lock */ if (!priv->timer_running) @@ -3371,8 +4187,9 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) if (ret != GST_CLOCK_UNSCHEDULED) { now = timer_timeout + MAX (clock_jitter, 0); - GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT, - ret, priv->timer_seqnum, clock_jitter); + GST_DEBUG_OBJECT (jitterbuffer, + "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum, + GST_STIME_ARGS (clock_jitter)); } else { GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled"); } @@ -3407,6 +4224,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer) JBUF_LOCK_CHECK (priv, flushing); do { result = handle_next_buffer (jitterbuffer); + JBUF_SIGNAL_QUEUE (priv); if (G_LIKELY (result == GST_FLOW_WAIT)) { /* now wait for the next event */ JBUF_WAIT_EVENT (priv, flushing); @@ -3436,6 +4254,8 @@ pause: gst_pad_pause_task (priv->srcpad); if (result == GST_FLOW_EOS) { event = gst_event_new_eos (); + if (priv->segment_seqnum != GST_SEQNUM_INVALID) + gst_event_set_seqnum (event, priv->segment_seqnum); gst_pad_push_event (priv->srcpad, event); } return; @@ -3487,7 +4307,10 @@ do_handle_sync (GstRtpJitterBuffer * jitterbuffer) /* check how far ahead it is to our RTP timestamps */ diff = ext_rtptime - last_rtptime; /* if bigger than 1 second, we drop it */ - if (diff > clock_rate) { + if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 && + diff > + gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff, + clock_rate, 1000)) { GST_DEBUG_OBJECT (jitterbuffer, "too far ahead"); /* should drop this, but some RTSP servers end up with bogus * way too ahead RTCP packet when repeated PAUSE/PLAY, @@ -3805,10 +4628,26 @@ gst_rtp_jitter_buffer_set_property (GObject * object, break; case PROP_TS_OFFSET: JBUF_LOCK (priv); - priv->ts_offset = g_value_get_int64 (value); + if (priv->max_ts_offset_adjustment != 0) { + gint64 new_offset = g_value_get_int64 (value); + + if (new_offset > priv->ts_offset) { + priv->ts_offset_remainder = new_offset - priv->ts_offset; + } else { + priv->ts_offset_remainder = -(priv->ts_offset - new_offset); + } + } else { + priv->ts_offset = g_value_get_int64 (value); + priv->ts_offset_remainder = 0; + } priv->ts_discont = TRUE; JBUF_UNLOCK (priv); break; + case PROP_MAX_TS_OFFSET_ADJUSTMENT: + JBUF_LOCK (priv); + priv->max_ts_offset_adjustment = g_value_get_uint64 (value); + JBUF_UNLOCK (priv); + break; case PROP_DO_LOST: JBUF_LOCK (priv); priv->do_lost = g_value_get_boolean (value); @@ -3864,6 +4703,42 @@ gst_rtp_jitter_buffer_set_property (GObject * object, priv->rtx_max_retries = g_value_get_int (value); JBUF_UNLOCK (priv); break; + case PROP_RTX_DEADLINE: + JBUF_LOCK (priv); + priv->rtx_deadline_ms = g_value_get_int (value); + JBUF_UNLOCK (priv); + break; + case PROP_RTX_STATS_TIMEOUT: + JBUF_LOCK (priv); + priv->rtx_stats_timeout = g_value_get_uint (value); + JBUF_UNLOCK (priv); + break; + case PROP_MAX_RTCP_RTP_TIME_DIFF: + JBUF_LOCK (priv); + priv->max_rtcp_rtp_time_diff = g_value_get_int (value); + JBUF_UNLOCK (priv); + break; + case PROP_MAX_DROPOUT_TIME: + JBUF_LOCK (priv); + priv->max_dropout_time = g_value_get_uint (value); + JBUF_UNLOCK (priv); + break; + case PROP_MAX_MISORDER_TIME: + JBUF_LOCK (priv); + priv->max_misorder_time = g_value_get_uint (value); + JBUF_UNLOCK (priv); + break; + case PROP_RFC7273_SYNC: + JBUF_LOCK (priv); + rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf, + g_value_get_boolean (value)); + JBUF_UNLOCK (priv); + break; + case PROP_FASTSTART_MIN_PACKETS: + JBUF_LOCK (priv); + priv->faststart_min_packets = g_value_get_uint (value); + JBUF_UNLOCK (priv); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3896,6 +4771,11 @@ gst_rtp_jitter_buffer_get_property (GObject * object, g_value_set_int64 (value, priv->ts_offset); JBUF_UNLOCK (priv); break; + case PROP_MAX_TS_OFFSET_ADJUSTMENT: + JBUF_LOCK (priv); + g_value_set_uint64 (value, priv->max_ts_offset_adjustment); + JBUF_UNLOCK (priv); + break; case PROP_DO_LOST: JBUF_LOCK (priv); g_value_set_boolean (value, priv->do_lost); @@ -3965,10 +4845,46 @@ gst_rtp_jitter_buffer_get_property (GObject * object, g_value_set_int (value, priv->rtx_max_retries); JBUF_UNLOCK (priv); break; + case PROP_RTX_DEADLINE: + JBUF_LOCK (priv); + g_value_set_int (value, priv->rtx_deadline_ms); + JBUF_UNLOCK (priv); + break; + case PROP_RTX_STATS_TIMEOUT: + JBUF_LOCK (priv); + g_value_set_uint (value, priv->rtx_stats_timeout); + JBUF_UNLOCK (priv); + break; case PROP_STATS: g_value_take_boxed (value, gst_rtp_jitter_buffer_create_stats (jitterbuffer)); break; + case PROP_MAX_RTCP_RTP_TIME_DIFF: + JBUF_LOCK (priv); + g_value_set_int (value, priv->max_rtcp_rtp_time_diff); + JBUF_UNLOCK (priv); + break; + case PROP_MAX_DROPOUT_TIME: + JBUF_LOCK (priv); + g_value_set_uint (value, priv->max_dropout_time); + JBUF_UNLOCK (priv); + break; + case PROP_MAX_MISORDER_TIME: + JBUF_LOCK (priv); + g_value_set_uint (value, priv->max_misorder_time); + JBUF_UNLOCK (priv); + break; + case PROP_RFC7273_SYNC: + JBUF_LOCK (priv); + g_value_set_boolean (value, + rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf)); + JBUF_UNLOCK (priv); + break; + case PROP_FASTSTART_MIN_PACKETS: + JBUF_LOCK (priv); + g_value_set_uint (value, priv->faststart_min_packets); + JBUF_UNLOCK (priv); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3978,15 +4894,21 @@ gst_rtp_jitter_buffer_get_property (GObject * object, static GstStructure * gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf) { + GstRtpJitterBufferPrivate *priv = jbuf->priv; GstStructure *s; - JBUF_LOCK (jbuf->priv); + JBUF_LOCK (priv); s = gst_structure_new ("application/x-rtp-jitterbuffer-stats", - "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests, - "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success, - "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num, - "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL); - JBUF_UNLOCK (jbuf->priv); + "num-pushed", G_TYPE_UINT64, priv->num_pushed, + "num-lost", G_TYPE_UINT64, priv->num_lost, + "num-late", G_TYPE_UINT64, priv->num_late, + "num-duplicates", G_TYPE_UINT64, priv->num_duplicates, + "avg-jitter", G_TYPE_UINT64, priv->avg_jitter, + "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests, + "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success, + "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num, + "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL); + JBUF_UNLOCK (priv); return s; }