* depayloader or other element to create concealment data or some other logic
* to gracefully handle the missing packets.
*
- * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
+ * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incoming
* buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
* buffer.
*
*
* - If seqnum N arrived, all seqnum older than
* N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
- * immediately. This is to request fast feedback for abonormally reorder
+ * immediately. This is to request fast feedback for abnormally reorder
* packets before any of the previous timeouts is triggered.
*
* A late packet triggers the GstRTPRetransmissionRequest custom upstream
(g_mutex_unlock (&(priv)->jbuf_lock)); \
} G_STMT_END
+#define JBUF_WAIT_QUEUE(priv) G_STMT_START { \
+ GST_DEBUG ("waiting queue"); \
+ (priv)->waiting_queue++; \
+ g_cond_wait (&(priv)->jbuf_queue, &(priv)->jbuf_lock); \
+ (priv)->waiting_queue--; \
+ GST_DEBUG ("waiting queue done"); \
+} G_STMT_END
+#define JBUF_SIGNAL_QUEUE(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_queue)) { \
+ GST_DEBUG ("signal queue, %d waiters", (priv)->waiting_queue); \
+ g_cond_signal (&(priv)->jbuf_queue); \
+ } \
+} G_STMT_END
+
#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
GST_DEBUG ("waiting timer"); \
- (priv)->waiting_timer = TRUE; \
+ (priv)->waiting_timer++; \
g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
- (priv)->waiting_timer = FALSE; \
+ (priv)->waiting_timer--; \
GST_DEBUG ("waiting timer done"); \
} G_STMT_END
#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
if (G_UNLIKELY ((priv)->waiting_timer)) { \
- GST_DEBUG ("signal timer"); \
+ GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \
g_cond_signal (&(priv)->jbuf_timer); \
} \
} G_STMT_END
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
+ gboolean waiting_queue;
+ GCond jbuf_queue;
gboolean waiting_timer;
GCond jbuf_timer;
gboolean waiting_event;
gboolean ts_discont;
gboolean active;
guint64 out_offset;
+ guint32 segment_seqnum;
gboolean timer_running;
GThread *timer_thread;
g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
"Sending retransmission event when this much reordering "
- "(0 disable, -1 automatic)",
+ "(0 disable)",
-1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
g_param_spec_uint ("max-dropout-time", "Max dropout time",
"The maximum time (milliseconds) of missing packets tolerated.",
- 0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
+ 0, G_MAXINT32, DEFAULT_MAX_DROPOUT_TIME,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
GST_DEBUG_CATEGORY_INIT
(rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_jitter_buffer_chain_rtcp);
}
static void
priv->last_pts = -1;
priv->last_rtptime = -1;
priv->avg_jitter = 0;
+ priv->segment_seqnum = GST_SEQNUM_INVALID;
priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
priv->rtx_stats_timers = timer_queue_new ();
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
+ g_cond_init (&priv->jbuf_queue);
g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event);
g_cond_init (&priv->jbuf_query);
g_array_free (priv->timers, TRUE);
timer_queue_free (priv->rtx_stats_timers);
g_mutex_clear (&priv->jbuf_lock);
+ g_cond_clear (&priv->jbuf_queue);
g_cond_clear (&priv->jbuf_timer);
g_cond_clear (&priv->jbuf_event);
g_cond_clear (&priv->jbuf_query);
/* this unblocks any waiting pops on the src pad task */
JBUF_SIGNAL_EVENT (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
}
priv->last_rtptime = -1;
priv->last_in_pts = 0;
priv->equidistant = 0;
+ priv->segment_seqnum = GST_SEQNUM_INVALID;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
/* block until we go to PLAYING */
priv->blocked = TRUE;
priv->timer_running = TRUE;
+ priv->srcresult = GST_FLOW_OK;
priv->timer_thread =
g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
priv->timer_running = FALSE;
+ priv->srcresult = GST_FLOW_FLUSHING;
unschedule_current_timer (jitterbuffer);
JBUF_SIGNAL_TIMER (priv);
JBUF_SIGNAL_QUERY (priv, FALSE);
+ JBUF_SIGNAL_QUEUE (priv);
JBUF_UNLOCK (priv);
g_thread_join (priv->timer_thread);
priv->timer_thread = NULL;
GstSegment segment;
gst_event_copy_segment (event, &segment);
+ priv->segment_seqnum = gst_event_get_seqnum (event);
+
/* we need time for now */
if (segment.format != GST_FORMAT_TIME) {
GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
gst_segment_init (&segment, GST_FORMAT_TIME);
event = gst_event_new_segment (&segment);
+ gst_event_set_seqnum (event, priv->segment_seqnum);
}
priv->segment = segment;
GST_DEBUG_OBJECT (jitterbuffer, "adding event");
item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
- if (head)
+ if (head || priv->eos)
JBUF_SIGNAL_EVENT (priv);
return TRUE;
GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
g_array_remove_index_fast (priv->timers, idx);
timer->idx = idx;
+
+ JBUF_SIGNAL_TIMER (priv);
}
static void
GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
g_array_set_size (priv->timers, 0);
unschedule_current_timer (jitterbuffer);
+ JBUF_SIGNAL_TIMER (priv);
}
/* get the extra delay to wait before sending RTX */
timer->num_rtx_received++;
}
+ /* At 2^15, we would detect a seqnum rollover too early, therefore
+ * limit the queue size. But let's not limit it to a number that is
+ * too small to avoid emptying it needlessly if there is a spurious huge
+ * sequence number, let's allow at least 10k packets in any case. */
+ while (rtp_jitter_buffer_get_seqnum_diff (priv->jbuf) >= 32765 &&
+ rtp_jitter_buffer_num_packets (priv->jbuf) > 10000 &&
+ priv->srcresult == GST_FLOW_OK)
+ JBUF_WAIT_QUEUE (priv);
+ if (priv->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
/* let's check if this buffer is too late, we can only accept packets with
* bigger seqnum than the one we last pushed. */
if (G_LIKELY (priv->last_popped_seqnum != -1)) {
priv->next_seqnum = (seqnum + item->count) & 0xffff;
}
msg = check_buffering_percent (jitterbuffer, percent);
+
+ if (type == ITEM_TYPE_EVENT && outevent &&
+ GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
+ g_assert (priv->eos);
+ while (priv->timers->len > 0) {
+ /* Stopping timers */
+ unschedule_current_timer (jitterbuffer);
+ JBUF_WAIT_TIMER (priv);
+ }
+ }
+
JBUF_UNLOCK (priv);
item->data = NULL;
GST_DEBUG_OBJECT (jitterbuffer,
"Sequence number GAP detected: expected %d instead of %d (%d missing)",
next_seqnum, seqnum, gap);
- result = GST_FLOW_WAIT;
+ /* if we have reached EOS, just keep processing */
+ if (priv->eos) {
+ result = pop_and_push_next (jitterbuffer, seqnum);
+ result = GST_FLOW_OK;
+ } else {
+ result = GST_FLOW_WAIT;
+ }
}
}
GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
remove_timer (jitterbuffer, timer);
if (!priv->eos) {
+ GstEvent *event;
+
/* there was no EOS in the buffer, put one in there now */
- queue_event (jitterbuffer, gst_event_new_eos ());
+ event = gst_event_new_eos ();
+ if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, priv->segment_seqnum);
+ queue_event (jitterbuffer, event);
}
JBUF_SIGNAL_EVENT (priv);
* otherwise always be 0
*/
GST_OBJECT_LOCK (jitterbuffer);
- if (GST_ELEMENT_CLOCK (jitterbuffer)) {
+ if (priv->eos) {
+ now = GST_CLOCK_TIME_NONE;
+ } else if (GST_ELEMENT_CLOCK (jitterbuffer)) {
now =
gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
GST_ELEMENT_CAST (jitterbuffer)->base_time;
GstClockReturn ret;
GstClockTimeDiff clock_jitter;
- if (timer_timeout == -1 || timer_timeout <= now) {
+ if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
/* We have normally removed all lost timers in the loop above */
g_assert (timer->type != TIMER_TYPE_LOST);
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
+ JBUF_SIGNAL_QUEUE (priv);
if (G_LIKELY (result == GST_FLOW_WAIT)) {
/* now wait for the next event */
JBUF_WAIT_EVENT (priv, flushing);
gst_pad_pause_task (priv->srcpad);
if (result == GST_FLOW_EOS) {
event = gst_event_new_eos ();
+ if (priv->segment_seqnum != GST_SEQNUM_INVALID)
+ gst_event_set_seqnum (event, priv->segment_seqnum);
gst_pad_push_event (priv->srcpad, event);
}
return;