* depayloader or other element to create concealment data or some other logic
* to gracefully handle the missing packets.
*
- * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
+ * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming
* buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
* buffer.
*
*
* - If seqnum N arrived, all seqnum older than
* N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
- * immediately. This is to request fast feedback for abonormally reorder
+ * immediately. This is to request fast feedback for abnormally reorder
* packets before any of the previous timeouts is triggered.
*
* A late packet triggers the GstRTPRetransmissionRequest custom upstream
(g_mutex_unlock (&(priv)->jbuf_lock)); \
} G_STMT_END
+#define JBUF_WAIT_QUEUE(priv) G_STMT_START { \
+ GST_DEBUG ("waiting queue"); \
+ (priv)->waiting_queue++; \
+ g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \
+ (priv)->waiting_queue--; \
+ GST_DEBUG ("waiting queue done"); \
+} G_STMT_END
+#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_queue)) { \
+ GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
+ g_cond_signal (&(priv)->jbuf_queue); \
+ } \
+} G_STMT_END
+
#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
GST_DEBUG ("waiting timer"); \
(priv)->waiting_timer++; \
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
+ gboolean waiting_queue;
+ GCond jbuf_queue;
gboolean waiting_timer;
GCond jbuf_timer;
gboolean waiting_event;
gboolean ts_discont;
gboolean active;
guint64 out_offset;
+ guint32 segment_seqnum;
gboolean timer_running;
GThread *timer_thread;
g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
"Sending retransmission event when this much reordering "
- "(0 disable, -1 automatic)",
+ "(0 disable)",
-1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
g_param_spec_uint ("max-dropout-time", "Max dropout time",
"The maximum time (milliseconds) of missing packets tolerated.",
- 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
+ 0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
GST_DEBUG_CATEGORY_INIT
(rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp);
}
static void
priv->last_pts = -1;
priv->last_rtptime = -1;
priv->avg_jitter = 0;
+ priv->segment_seqnum = GST_SEQNUM_INVALID;
priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
priv->rtx_stats_timers = timer_queue_new ();
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
+ g_cond_init (&priv->jbuf_queue);
g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event);
g_cond_init (&priv->jbuf_query);
g_array_free (priv->timers, TRUE);
timer_queue_free (priv->rtx_stats_timers);
g_mutex_clear (&priv->jbuf_lock);
+ g_cond_clear (&priv->jbuf_queue);
g_cond_clear (&priv->jbuf_timer);
g_cond_clear (&priv->jbuf_event);
g_cond_clear (&priv->jbuf_query);
/* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL_EVENT (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
}
priv->last_rtptime = -1;
priv->last_in_pts = 0;
priv->equidistant = 0;
+ priv->segment_seqnum = GST_SEQNUM_INVALID;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
/* block until we go to PLAYING */
priv->blocked = TRUE;
priv->timer_running = TRUE;
+ priv->srcresult = GST_FLOW_OK;
priv->timer_thread =
g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
priv->timer_running = FALSE;
+ priv->srcresult = GST_FLOW_FLUSHING;
unschedule_current_timer (jitterbuffer);
JBUF_SIGNAL_TIMER (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
g_thread_join (priv->timer_thread);
priv->timer_thread = NULL;
GstSegment segment;
gst_event_copy_segment (event, &segment);
+ priv->segment_seqnum = gst_event_get_seqnum (event);
+
/* we need time for now */
if (segment.format != GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
gst_segment_init (&segment, GST_FORMAT_TIME);
event = gst_event_new_segment (&segment);
+ gst_event_set_seqnum (event, priv->segment_seqnum);
}
priv->segment = segment;
timer->num_rtx_received++;
}
+ /* At 2^15, we would detect a seqnum rollover too early, therefore
+ * limit the queue size. But let's not limit it to a number that is
+ * too small to avoid emptying it needlessly if there is a spurious huge
+ * sequence number, let's allow at least 10k packets in any case. */
+ while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 &&
+ rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 &&
+ priv->srcresult == GST_FLOW_OK)
+ JBUF_WAIT_QUEUE (priv);
+ if (priv->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
/* let's check if this buffer is too late, we can only accept packets with
* bigger seqnum than the one we last pushed. */
if (G_LIKELY (priv->last_popped_seqnum != -1)) {
GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
remove_timer (jitterbuffer, timer);
if (!priv->eos) {
+ GstEvent *event;
+
/* there was no EOS in the buffer, put one in there now */
- queue_event (jitterbuffer, gst_event_new_eos ());
+ event = gst_event_new_eos ();
+ if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, priv->segment_seqnum);
+ queue_event (jitterbuffer, event);
}
JBUF_SIGNAL_EVENT (priv);
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
+ JBUF_SIGNAL_QUEUE (priv);
if (G_LIKELY (result == GST_FLOW_WAIT)) {
/* now wait for the next event */
JBUF_WAIT_EVENT (priv, flushing);
gst_pad_pause_task (priv->srcpad);
if (result == GST_FLOW_EOS) {
event = gst_event_new_eos ();
+ if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, priv->segment_seqnum);
gst_pad_push_event (priv->srcpad, event);
}
return;