gboolean discont;
GstClockTime timestamp;
GstClockTime duration;
+
+ guint32 next_seqnum;
};
/* Filter signals and args */
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *out_buf;
GstClockTime timestamp;
+ guint16 seqnum;
+ gboolean reset_seq, discont;
+ gint gap;
filter = GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad));
+ /* we must validate, it's possible that this element is plugged right after a
+ * network receiver and we don't want to operate on invalid data */
if (!gst_rtp_buffer_validate (in))
goto invalid_buffer;
priv = filter->priv;
priv->discont = GST_BUFFER_IS_DISCONT (in);
+ timestamp = GST_BUFFER_TIMESTAMP (in);
/* convert to running_time and save the timestamp, this is the timestamp
* we put on outgoing buffers. */
- timestamp = GST_BUFFER_TIMESTAMP (in);
timestamp = gst_segment_to_running_time (&filter->segment, GST_FORMAT_TIME,
timestamp);
priv->timestamp = timestamp;
priv->duration = GST_BUFFER_DURATION (in);
+ seqnum = gst_rtp_buffer_get_seq (in);
+ reset_seq = TRUE;
+ discont = FALSE;
+
+ GST_LOG_OBJECT (filter, "discont %d, seqnum %u, timestamp %"
+ GST_TIME_FORMAT, priv->discont, seqnum, GST_TIME_ARGS (timestamp));
+
+ /* Check seqnum. This is a very simple check that makes sure that the seqnums
+ * are striclty increasing, dropping anything that is out of the ordinary. We
+ * can only do this when the next_seqnum is known. */
+ if (priv->next_seqnum != -1) {
+ gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum);
+
+ /* if we have no gap, all is fine */
+ if (G_UNLIKELY (gap != 0)) {
+ GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum,
+ priv->next_seqnum, gap);
+ if (gap < 0) {
+ /* seqnum > next_seqnum, we are missing some packets, this is always a
+ * DISCONT. */
+ GST_LOG_OBJECT (filter, "%d missing packets", gap);
+ discont = TRUE;
+ } else {
+ /* seqnum < next_seqnum, we have seen this packet before or the sender
+ * could be restarted. If the packet is not too old, we throw it away as
+ * a duplicate, otherwise we mark discont and continue. 100 misordered
+ * packets is a good threshold. See also RFC 4737. */
+ if (gap < 100)
+ goto dropping;
+
+ GST_LOG_OBJECT (filter,
+ "%d > 100, packet too old, sender likely restarted", gap);
+ discont = TRUE;
+ }
+ }
+ }
+ priv->next_seqnum = (seqnum + 1) & 0xffff;
+
+ if (discont && !priv->discont) {
+ GST_LOG_OBJECT (filter, "mark DISCONT on input buffer");
+ /* we detected a seqnum discont but the buffer was not flagged with a discont,
+ * set the discont flag so that the subclass can throw away old data. */
+ priv->discont = TRUE;
+ GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT);
+ }
+
bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
+ if (G_UNLIKELY (bclass->process == NULL))
+ goto no_process;
+
/* let's send it out to processing */
out_buf = bclass->process (filter, in);
if (out_buf) {
gst_buffer_unref (in);
return GST_FLOW_OK;
}
+dropping:
+ {
+ GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
+ gst_buffer_unref (in);
+ return GST_FLOW_OK;
+ }
+no_process:
+ {
+ /* this is not fatal but should be filtered earlier */
+ GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL),
+ ("The subclass does not have a process method"));
+ gst_buffer_unref (in);
+ return GST_FLOW_ERROR;
+ }
}
static gboolean
gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
filter->need_newsegment = TRUE;
+ filter->priv->next_seqnum = -1;
break;
case GST_EVENT_NEWSEGMENT:
{
/* set the caps if any */
srccaps = GST_PAD_CAPS (filter->srcpad);
- if (srccaps)
+ if (G_LIKELY (srccaps))
gst_buffer_set_caps (out_buf, srccaps);
bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
if (bclass->set_gst_timestamp && do_ts)
bclass->set_gst_timestamp (filter, rtptime, out_buf);
- if (priv->discont) {
+ if (G_UNLIKELY (priv->discont)) {
+ GST_LOG_OBJECT (filter, "Marking DISCONT on output buffer");
GST_BUFFER_FLAG_SET (out_buf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
}
GST_LOG_OBJECT (filter, "Pushing buffer size %d, timestamp %" GST_TIME_FORMAT,
GST_BUFFER_SIZE (out_buf),
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (out_buf)));
+
ret = gst_pad_push (filter->srcpad, out_buf);
- GST_LOG_OBJECT (filter, "Pushed buffer: %s", gst_flow_get_name (ret));
return ret;
}
priv->npt_stop = -1;
priv->play_speed = 1.0;
priv->play_scale = 1.0;
+ filter->priv->next_seqnum = -1;
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
break;