guint64 ips_rtptime;
GstClockTime packet_spacing;
+ GQueue gap_packets;
+
/* the next expected seqnum we receive */
GstClockTime last_in_dts;
guint32 last_in_seqnum;
g_cond_init (&priv->jbuf_timer);
g_cond_init (&priv->jbuf_event);
g_cond_init (&priv->jbuf_query);
+ g_queue_init (&priv->gap_packets);
/* reset skew detection initialy */
rtp_jitter_buffer_reset_skew (priv->jbuf);
g_cond_clear (&priv->jbuf_query);
rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
+ g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (&priv->gap_packets);
g_object_unref (priv->jbuf);
G_OBJECT_CLASS (parent_class)->finalize (object);
rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
rtp_jitter_buffer_reset_skew (priv->jbuf);
remove_all_timers (jitterbuffer);
+ g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (&priv->gap_packets);
JBUF_UNLOCK (priv);
}
}
}
+static gint
+compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
+{
+ GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
+ GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
+ guint seq_a, seq_b;
+
+ gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
+ seq_a = gst_rtp_buffer_get_seq (&rtp_a);
+ gst_rtp_buffer_unmap (&rtp_a);
+
+ gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
+ seq_b = gst_rtp_buffer_get_seq (&rtp_b);
+ gst_rtp_buffer_unmap (&rtp_b);
+
+ return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
+}
+
+static gboolean
+handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future,
+ GstBuffer * buffer, guint8 pt, guint16 seqnum, gint gap)
+{
+ GstRtpJitterBufferPrivate *priv;
+ guint gap_packets_length;
+ gboolean reset = FALSE;
+
+ priv = jitterbuffer->priv;
+
+ if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
+ GList *l;
+ guint32 prev_gap_seq = -1;
+ gboolean all_consecutive = TRUE;
+
+ g_queue_insert_sorted (&priv->gap_packets, buffer,
+ (GCompareDataFunc) compare_buffer_seqnum, NULL);
+
+ for (l = priv->gap_packets.head; l; l = l->next) {
+ GstBuffer *gap_buffer = l->data;
+ GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
+ guint32 gap_seq;
+
+ gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
+
+ all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
+
+ gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
+ if (prev_gap_seq == -1)
+ prev_gap_seq = gap_seq;
+ else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
+ all_consecutive = FALSE;
+ else
+ prev_gap_seq = gap_seq;
+
+ gst_rtp_buffer_unmap (&gap_rtp);
+ if (!all_consecutive)
+ break;
+ }
+
+ if (all_consecutive && gap_packets_length > 3) {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "buffer too %s %d < %d, got 5 consecutive ones - reset",
+ (future ? "new" : "old"), gap,
+ (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER));
+ reset = TRUE;
+ } else if (!all_consecutive) {
+ g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (&priv->gap_packets);
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
+ (future ? "new" : "old"), gap,
+ (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER));
+ buffer = NULL;
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "buffer too %s %d < %d, got %u consecutive ones - waiting",
+ (future ? "new" : "old"), gap,
+ (future ? RTP_MAX_DROPOUT : -RTP_MAX_MISORDER),
+ gap_packets_length + 1);
+ buffer = NULL;
+ }
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
+ gap, -RTP_MAX_MISORDER);
+ g_queue_push_tail (&priv->gap_packets, buffer);
+ buffer = NULL;
+ }
+
+ return reset;
+}
+
static GstFlowReturn
gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer)
goto gap_but_no_dts;
} else if (gap < 0) {
/* we received an old packet */
- if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
- /* too old packet, reset */
- GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
- -RTP_MAX_MISORDER);
- reset = TRUE;
+ if (G_UNLIKELY (gap != -1 && gap < -RTP_MAX_MISORDER)) {
+ reset =
+ handle_big_gap_buffer (jitterbuffer, FALSE, buffer, pt, seqnum,
+ gap);
+ buffer = NULL;
} else {
GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
}
} else {
/* new packet, we are missing some packets */
- if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
- /* packet too far in future, reset */
- GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
- RTP_MAX_DROPOUT);
- reset = TRUE;
+ if (G_UNLIKELY (gap >= RTP_MAX_DROPOUT)) {
+ reset =
+ handle_big_gap_buffer (jitterbuffer, TRUE, buffer, pt, seqnum,
+ gap);
+ buffer = NULL;
} else {
GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
/* fill in the gap with EXPECTED timers */
}
if (G_UNLIKELY (reset)) {
GList *events = NULL, *l;
+ GList *buffers;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
rtp_jitter_buffer_flush (priv->jbuf,
priv->discont = TRUE;
priv->last_popped_seqnum = -1;
priv->next_seqnum = seqnum;
- do_next_seqnum = TRUE;
+
+ priv->last_in_seqnum = -1;
+ priv->last_in_dts = -1;
+ priv->next_in_seqnum = -1;
/* Insert all sticky events again in order, otherwise we would
* potentially loose STREAM_START, CAPS or SEGMENT events
g_list_free (events);
JBUF_SIGNAL_EVENT (priv);
+
+ /* reset spacing estimation when gap */
+ priv->ips_rtptime = -1;
+ priv->ips_dts = GST_CLOCK_TIME_NONE;
+
+ buffers = g_list_copy (priv->gap_packets.head);
+ g_queue_clear (&priv->gap_packets);
+
+ priv->ips_rtptime = -1;
+ priv->ips_dts = GST_CLOCK_TIME_NONE;
+ JBUF_UNLOCK (jitterbuffer->priv);
+
+ for (l = buffers; l; l = l->next) {
+ ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
+ l->data = NULL;
+ if (ret != GST_FLOW_OK)
+ break;
+ }
+ for (; l; l = l->next)
+ gst_buffer_unref (l->data);
+ g_list_free (buffers);
+
+ return ret;
}
/* reset spacing estimation when gap */
priv->ips_rtptime = -1;
priv->ips_rtptime = rtptime;
priv->ips_dts = dts;
}
+
+ /* We had no huge gap, let's drop all the gap packets */
+ if (buffer != NULL) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
+ g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (&priv->gap_packets);
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Had big gap, waiting for more consecutive packets");
+ JBUF_UNLOCK (jitterbuffer->priv);
+ return GST_FLOW_OK;
+ }
+
if (do_next_seqnum) {
priv->last_in_seqnum = seqnum;
priv->last_in_dts = dts;
break;
case ITEM_TYPE_LOST:
case ITEM_TYPE_EVENT:
+ /* We got not enough consecutive packets with a huge gap, we can
+ * as well just drop them here now on EOS */
+ if (GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
+ g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
+ g_queue_clear (&priv->gap_packets);
+ }
+
GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);