gst-libs/gst/rtp/gstbasertpdepayload.c: Check sequence numbers, mark input buffers...
authorWim Taymans <wim.taymans@gmail.com>
Fri, 23 May 2008 14:14:28 +0000 (14:14 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Fri, 23 May 2008 14:14:28 +0000 (14:14 +0000)
Original commit message from CVS:
* gst-libs/gst/rtp/gstbasertpdepayload.c:
(gst_base_rtp_depayload_chain),
(gst_base_rtp_depayload_handle_sink_event),
(gst_base_rtp_depayload_push_full),
(gst_base_rtp_depayload_change_state):
Check sequence numbers, mark input buffers with a discont flag for the
subclass when we detected a gap, drop duplicate buffers. We do this
because one can use the element without a jitterbuffer in front and we
don't want to feed the subclasses invalid or reordered data.
Do an error when the subclass did not provide a process function instead
of crashing.
Some other small cleanups.

ChangeLog
gst-libs/gst/rtp/gstbasertpdepayload.c

index f4e9127ccaef1efd130a9016066bb71883f74ef5..3209e286753f956c7ce796294e35fca2cac4cb56 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,18 @@
+2008-05-23  Wim Taymans  <wim.taymans@collabora.co.uk>
+
+       * gst-libs/gst/rtp/gstbasertpdepayload.c:
+       (gst_base_rtp_depayload_chain),
+       (gst_base_rtp_depayload_handle_sink_event),
+       (gst_base_rtp_depayload_push_full),
+       (gst_base_rtp_depayload_change_state):
+       Check sequence numbers, mark input buffers with a discont flag for the
+       subclass when we detected a gap, drop duplicate buffers. We do this
+       because one can use the element without a jitterbuffer in front and we
+       don't want to feed the subclasses invalid or reordered data.
+       Do an error when the subclass did not provide a process function instead
+       of crashing.
+       Some other small cleanups.
+
 2008-05-22  Tim-Philipp Müller  <tim.muller at collabora co uk>
 
        * gst/videotestsrc/videotestsrc.c: (paint_hline_NV12_NV21):
index 519265f457745075de82bb92ecccea3945a49b20..21dbca236ddaed6ae5764dc99c8f51a80801b8e6 100644 (file)
@@ -56,6 +56,8 @@ struct _GstBaseRTPDepayloadPrivate
   gboolean discont;
   GstClockTime timestamp;
   GstClockTime duration;
+
+  guint32 next_seqnum;
 };
 
 /* Filter signals and args */
@@ -255,25 +257,79 @@ gst_base_rtp_depayload_chain (GstPad * pad, GstBuffer * in)
   GstFlowReturn ret = GST_FLOW_OK;
   GstBuffer *out_buf;
   GstClockTime timestamp;
+  guint16 seqnum;
+  gboolean reset_seq, discont;
+  gint gap;
 
   filter = GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad));
 
+  /* we must validate, it's possible that this element is plugged right after a
+   * network receiver and we don't want to operate on invalid data */
   if (!gst_rtp_buffer_validate (in))
     goto invalid_buffer;
 
   priv = filter->priv;
   priv->discont = GST_BUFFER_IS_DISCONT (in);
 
+  timestamp = GST_BUFFER_TIMESTAMP (in);
   /* convert to running_time and save the timestamp, this is the timestamp
    * we put on outgoing buffers. */
-  timestamp = GST_BUFFER_TIMESTAMP (in);
   timestamp = gst_segment_to_running_time (&filter->segment, GST_FORMAT_TIME,
       timestamp);
   priv->timestamp = timestamp;
   priv->duration = GST_BUFFER_DURATION (in);
 
+  seqnum = gst_rtp_buffer_get_seq (in);
+  reset_seq = TRUE;
+  discont = FALSE;
+
+  GST_LOG_OBJECT (filter, "discont %d, seqnum %u, timestamp %"
+      GST_TIME_FORMAT, priv->discont, seqnum, GST_TIME_ARGS (timestamp));
+
+  /* Check seqnum. This is a very simple check that makes sure that the seqnums
+   * are striclty increasing, dropping anything that is out of the ordinary. We
+   * can only do this when the next_seqnum is known. */
+  if (priv->next_seqnum != -1) {
+    gap = gst_rtp_buffer_compare_seqnum (seqnum, priv->next_seqnum);
+
+    /* if we have no gap, all is fine */
+    if (G_UNLIKELY (gap != 0)) {
+      GST_LOG_OBJECT (filter, "got packet %u, expected %u, gap %d", seqnum,
+          priv->next_seqnum, gap);
+      if (gap < 0) {
+        /* seqnum > next_seqnum, we are missing some packets, this is always a
+         * DISCONT. */
+        GST_LOG_OBJECT (filter, "%d missing packets", gap);
+        discont = TRUE;
+      } else {
+        /* seqnum < next_seqnum, we have seen this packet before or the sender
+         * could be restarted. If the packet is not too old, we throw it away as
+         * a duplicate, otherwise we mark discont and continue. 100 misordered
+         * packets is a good threshold. See also RFC 4737. */
+        if (gap < 100)
+          goto dropping;
+
+        GST_LOG_OBJECT (filter,
+            "%d > 100, packet too old, sender likely restarted", gap);
+        discont = TRUE;
+      }
+    }
+  }
+  priv->next_seqnum = (seqnum + 1) & 0xffff;
+
+  if (discont && !priv->discont) {
+    GST_LOG_OBJECT (filter, "mark DISCONT on input buffer");
+    /* we detected a seqnum discont but the buffer was not flagged with a discont,
+     * set the discont flag so that the subclass can throw away old data. */
+    priv->discont = TRUE;
+    GST_BUFFER_FLAG_SET (in, GST_BUFFER_FLAG_DISCONT);
+  }
+
   bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
 
+  if (G_UNLIKELY (bclass->process == NULL))
+    goto no_process;
+
   /* let's send it out to processing */
   out_buf = bclass->process (filter, in);
   if (out_buf) {
@@ -298,6 +354,20 @@ invalid_buffer:
     gst_buffer_unref (in);
     return GST_FLOW_OK;
   }
+dropping:
+  {
+    GST_WARNING_OBJECT (filter, "%d <= 100, dropping old packet", gap);
+    gst_buffer_unref (in);
+    return GST_FLOW_OK;
+  }
+no_process:
+  {
+    /* this is not fatal but should be filtered earlier */
+    GST_ELEMENT_ERROR (filter, STREAM, NOT_IMPLEMENTED, (NULL),
+        ("The subclass does not have a process method"));
+    gst_buffer_unref (in);
+    return GST_FLOW_ERROR;
+  }
 }
 
 static gboolean
@@ -314,6 +384,7 @@ gst_base_rtp_depayload_handle_sink_event (GstPad * pad, GstEvent * event)
 
       gst_segment_init (&filter->segment, GST_FORMAT_UNDEFINED);
       filter->need_newsegment = TRUE;
+      filter->priv->next_seqnum = -1;
       break;
     case GST_EVENT_NEWSEGMENT:
     {
@@ -377,7 +448,7 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
 
   /* set the caps if any */
   srccaps = GST_PAD_CAPS (filter->srcpad);
-  if (srccaps)
+  if (G_LIKELY (srccaps))
     gst_buffer_set_caps (out_buf, srccaps);
 
   bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
@@ -386,7 +457,8 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
   if (bclass->set_gst_timestamp && do_ts)
     bclass->set_gst_timestamp (filter, rtptime, out_buf);
 
-  if (priv->discont) {
+  if (G_UNLIKELY (priv->discont)) {
+    GST_LOG_OBJECT (filter, "Marking DISCONT on output buffer");
     GST_BUFFER_FLAG_SET (out_buf, GST_BUFFER_FLAG_DISCONT);
     priv->discont = FALSE;
   }
@@ -395,8 +467,8 @@ gst_base_rtp_depayload_push_full (GstBaseRTPDepayload * filter,
   GST_LOG_OBJECT (filter, "Pushing buffer size %d, timestamp %" GST_TIME_FORMAT,
       GST_BUFFER_SIZE (out_buf),
       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (out_buf)));
+
   ret = gst_pad_push (filter->srcpad, out_buf);
-  GST_LOG_OBJECT (filter, "Pushed buffer: %s", gst_flow_get_name (ret));
 
   return ret;
 }
@@ -548,6 +620,7 @@ gst_base_rtp_depayload_change_state (GstElement * element,
       priv->npt_stop = -1;
       priv->play_speed = 1.0;
       priv->play_scale = 1.0;
+      filter->priv->next_seqnum = -1;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       break;