RTPJitterBuffer *jbuf;
GMutex *jbuf_lock;
GCond *jbuf_cond;
+ gboolean waiting;
/* properties */
guint latency_ms;
GstFlowReturn ret = GST_FLOW_OK;
GstClockTime timestamp;
guint64 latency_ts;
+ gboolean tail;
jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
* have a duplicate. */
- if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp))
+ if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, &tail))
goto duplicate;
- /* signal addition of new buffer */
- JBUF_SIGNAL (priv);
+ /* signal addition of new buffer when the _loop is waiting. */
+ if (priv->waiting)
+ JBUF_SIGNAL (priv);
/* let's unschedule and unblock any waiting buffers. We only want to do this
- * if there is a currently waiting newer (> seqnum) buffer */
- if (priv->clock_id) {
- if (priv->waiting_seqnum > seqnum) {
- gst_clock_id_unschedule (priv->clock_id);
- GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting buffer");
- }
+ * when the tail buffer changed */
+ if (priv->clock_id && tail) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Unscheduling waiting buffer, new tail buffer");
+ gst_clock_id_unschedule (priv->clock_id);
}
GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
*
* For each pushed buffer, the seqnum is recorded, if the next buffer B has a
* different seqnum (missing packets before B), this function will wait for the
- * missing packet to arrive up to the rtp timestamp of buffer B.
+ * missing packet to arrive up to the timestamp of buffer B.
*/
static void
gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK_CHECK (priv, flushing);
again:
- GST_DEBUG_OBJECT (jitterbuffer, "Popping item");
+ GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
while (TRUE) {
/* always wait if we are blocked */
goto do_eos;
}
/* wait for packets or flushing now */
+ priv->waiting = TRUE;
JBUF_WAIT_CHECK (priv, flushing);
+ priv->waiting = FALSE;
}
- /* pop a buffer, we must have a buffer now */
- outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+ /* peek a buffer, we're just looking at the timestamp and the sequence number.
+ * If all is fine, we'll pop and push it. If the sequence number is wrong we
+ * wait on the timestamp. In the chain function we will unlock the wait when a
+ * new buffer is available. The peeked buffer is valid for as long as we hold
+ * the jitterbuffer lock. */
+ outbuf = rtp_jitter_buffer_peek (priv->jbuf);
seqnum = gst_rtp_buffer_get_seq (outbuf);
/* get the timestamp, this is already corrected for clock skew by the
timestamp = GST_BUFFER_TIMESTAMP (outbuf);
GST_DEBUG_OBJECT (jitterbuffer,
- "Popped buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
+ "Peeked buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
seqnum, GST_TIME_ARGS (timestamp),
rtp_jitter_buffer_num_packets (priv->jbuf));
if (ret == GST_CLOCK_UNSCHEDULED) {
GST_DEBUG_OBJECT (jitterbuffer,
"Wait got unscheduled, will retry to push with new buffer");
- /* reinsert popped buffer into queue, no need to recalculate skew, we do
- * that when inserting the buffer in the chain function */
- if (!rtp_jitter_buffer_insert (priv->jbuf, outbuf, -1)) {
- GST_DEBUG_OBJECT (jitterbuffer,
- "Duplicate packet #%d detected, dropping", seqnum);
- priv->num_duplicates++;
- gst_buffer_unref (outbuf);
- }
goto again;
}
/* Get new timestamp, latency might have changed */
out_time = apply_offset (jitterbuffer, timestamp);
}
push_buffer:
+ /* when we get here we are ready to pop and push the buffer */
+ outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+
/* check if we are pushing something unexpected */
if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
gint dropped;