ulpfecdec: output perfect seqnums
authorMathieu Duponchelle <mathieu@centricular.com>
Mon, 2 Apr 2018 14:06:35 +0000 (16:06 +0200)
committerMathieu Duponchelle <mathieu@centricular.com>
Thu, 19 Apr 2018 16:17:39 +0000 (18:17 +0200)
ULP FEC, as defined in RFC 5109, has the protected and protection
packets sharing the same ssrc, and a different payload type, and
implies rewriting the seqnums of the protected stream when encoding
the protection packets. This has the unfortunate drawback of not
being able to tell whether a lost packet was a protection packet.

rtpbasedepayload relies on gaps in the seqnums to set the DISCONT
flag on buffers it outputs. Before that commit, this created two
problems:

* The protection packets don't make it as far as the depayloader,
  which means it will mark buffers as DISCONT every time the previous
  packets were protected

* While we could work around the previous issue by looking at
  the protection packets ignored and dropped in rtpptdemux, we
  would still mark buffers as DISCONT when a FEC packet was lost,
  as we cannot know that it was indeed a FEC packet, even though
  this should have no impact on the decoding of the stream

With this commit, we consider that when using ULPFEC, gaps in
the seqnums are not a reliable indicator of whether buffers should
be marked as DISCONT or not, and thus rewrite the seqnums on
the decoding side as well to form a perfect sequence, this
obviously doesn't prevent the jitterbuffer from doing its job
as the ulpfec decoder is downstream from it.

https://bugzilla.gnome.org/show_bug.cgi?id=794909

gst/rtp/gstrtpulpfecdec.c
gst/rtp/gstrtpulpfecdec.h
tests/check/elements/rtpulpfec.c

index 9cb4997..da29371 100644 (file)
@@ -363,11 +363,18 @@ gst_rtp_ulpfec_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
   GstRtpUlpFecDec *self = GST_RTP_ULPFEC_DEC (parent);
 
   if (G_LIKELY (GST_FLOW_OK == self->chain_return_val)) {
+    GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+    buf = gst_buffer_make_writable (buf);
+
     if (G_UNLIKELY (self->unset_discont_flag)) {
       self->unset_discont_flag = FALSE;
-      buf = gst_buffer_make_writable (buf);
       GST_BUFFER_FLAG_UNSET (buf, GST_BUFFER_FLAG_DISCONT);
     }
+
+    gst_rtp_buffer_map (buf, GST_MAP_WRITE, &rtp);
+    gst_rtp_buffer_set_seq (&rtp, self->next_seqnum++);
+    gst_rtp_buffer_unmap (&rtp);
+
     return gst_pad_push (self->srcpad, buf);
   }
 
@@ -396,6 +403,9 @@ gst_rtp_ulpfec_dec_handle_packet_loss (GstRtpUlpFecDec * self, guint16 seqnum,
             gst_rtp_ulpfec_dec_recover (self, self->caps_ssrc, caps_pt,
                 &recovered_pt, &recovered_seq))) {
       if (seqnum == recovered_seq) {
+        GstBuffer *sent_buffer;
+        GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+
         recovered_buffer = gst_buffer_make_writable (recovered_buffer);
         GST_BUFFER_PTS (recovered_buffer) = timestamp;
         /* GST_BUFFER_DURATION (recovered_buffer) = duration;
@@ -403,16 +413,21 @@ gst_rtp_ulpfec_dec_handle_packet_loss (GstRtpUlpFecDec * self, guint16 seqnum,
 
         if (!self->lost_packet_from_storage)
           rtp_storage_put_recovered_packet (self->storage,
-              gst_buffer_ref (recovered_buffer), recovered_pt, self->caps_ssrc,
-              recovered_seq);
+              recovered_buffer, recovered_pt, self->caps_ssrc, recovered_seq);
 
         GST_DEBUG_OBJECT (self,
             "Pushing recovered packet ssrc=0x%08x seq=%u %" GST_PTR_FORMAT,
             self->caps_ssrc, seqnum, recovered_buffer);
 
+        sent_buffer = gst_buffer_copy_deep (recovered_buffer);
+
+        gst_rtp_buffer_map (sent_buffer, GST_MAP_WRITE, &rtp);
+        gst_rtp_buffer_set_seq (&rtp, self->next_seqnum++);
+        gst_rtp_buffer_unmap (&rtp);
+
         ret = FALSE;
         self->unset_discont_flag = TRUE;
-        self->chain_return_val = gst_pad_push (self->srcpad, recovered_buffer);
+        self->chain_return_val = gst_pad_push (self->srcpad, sent_buffer);
         break;
       }
 
@@ -444,11 +459,15 @@ gst_rtp_ulpfec_dec_handle_sink_event (GstPad * pad, GstObject * parent,
       gst_event_has_name (event, "GstRTPPacketLost")) {
     guint seqnum;
     GstClockTime timestamp, duration;
+    GstStructure *s;
+
+    event = gst_event_make_writable (event);
+    s = gst_event_writable_structure (event);
 
     g_assert (self->have_caps_ssrc);
     g_assert (self->storage);
 
-    if (!gst_structure_get (gst_event_get_structure (event),
+    if (!gst_structure_get (s,
             "seqnum", G_TYPE_UINT, &seqnum,
             "timestamp", G_TYPE_UINT64, &timestamp,
             "duration", G_TYPE_UINT64, &duration, NULL))
@@ -457,10 +476,15 @@ gst_rtp_ulpfec_dec_handle_sink_event (GstPad * pad, GstObject * parent,
     forward =
         gst_rtp_ulpfec_dec_handle_packet_loss (self, seqnum, timestamp,
         duration);
-    if (forward)
+
+    if (forward) {
+      gst_structure_remove_field (s, "seqnum");
+      gst_structure_set (s, "might-have-been-fec", G_TYPE_BOOLEAN, TRUE, NULL);
       ++self->packets_unrecovered;
-    else
+    } else {
       ++self->packets_recovered;
+    }
+
     GST_DEBUG_OBJECT (self, "Unrecovered / Recovered: %lu / %lu",
         (gulong) self->packets_unrecovered, (gulong) self->packets_recovered);
   } else if (GST_EVENT_CAPS == GST_EVENT_TYPE (event)) {
@@ -514,6 +538,8 @@ gst_rtp_ulpfec_dec_init (GstRtpUlpFecDec * self)
 
   self->fec_pt = DEFAULT_FEC_PT;
 
+  self->next_seqnum = g_random_int_range (0, G_MAXINT16);
+
   self->chain_return_val = GST_FLOW_OK;
   self->have_caps_ssrc = FALSE;
   self->caps_ssrc = 0;
index 4eedc27..f9b10b0 100644 (file)
@@ -69,6 +69,7 @@ struct _GstRtpUlpFecDec {
   GArray *scratch_buf;
   gboolean lost_packet_from_storage;
   gboolean lost_packet_returned;
+  guint16 next_seqnum;
 
   /* stats */
   gsize fec_packets_received;
index 13ac19b..e0fa972 100644 (file)
@@ -74,7 +74,20 @@ push_lost_event (GstHarness * h, guint32 seqnum,
   }
 
   if (event_goes_through) {
-    fail_unless (packet_loss_in == packet_loss_out);
+    const GstStructure *s = gst_event_get_structure (packet_loss_out);
+    guint64 tscopy, durcopy;
+    gboolean might_have_been_fec;
+
+    fail_unless (gst_structure_has_name (s, "GstRTPPacketLost"));
+    fail_if (gst_structure_has_field (s, "seqnum"));
+    fail_unless (gst_structure_get_uint64 (s, "timestamp", &tscopy));
+    fail_unless (gst_structure_get_uint64 (s, "duration", &durcopy));
+    fail_unless (gst_structure_get_boolean (s, "might-have-been-fec",
+            &might_have_been_fec));
+
+    fail_unless_equals_uint64 (timestamp, tscopy);
+    fail_unless_equals_uint64 (duration, durcopy);
+    fail_unless (might_have_been_fec == TRUE);
     gst_event_unref (packet_loss_out);
   } else {
     fail_unless (NULL == packet_loss_out);
@@ -88,18 +101,31 @@ lose_and_recover_test (GstHarness * h, guint16 lost_seq,
   guint64 duration = 222222;
   guint64 timestamp = 111111;
   GstBuffer *bufout;
+  GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+  GstRTPBuffer rtpout = GST_RTP_BUFFER_INIT;
+  GstBuffer *wrap;
+  gpointer reccopy = g_malloc (recbuf_size);
+
+  memcpy (reccopy, recbuf, recbuf_size);
 
   push_lost_event (h, lost_seq, timestamp, duration, FALSE);
 
   bufout = gst_harness_pull (h);
   fail_unless_equals_int (gst_buffer_get_size (bufout), recbuf_size);
   fail_unless_equals_int (GST_BUFFER_PTS (bufout), timestamp);
-  fail_unless (gst_buffer_memcmp (bufout, 0, recbuf, recbuf_size) == 0);
+  wrap = gst_buffer_new_wrapped (reccopy, recbuf_size);
+  gst_rtp_buffer_map (wrap, GST_MAP_WRITE, &rtp);
+  gst_rtp_buffer_map (bufout, GST_MAP_READ, &rtpout);
+  gst_rtp_buffer_set_seq (&rtp, gst_rtp_buffer_get_seq (&rtpout));
+  fail_unless (gst_buffer_memcmp (bufout, 0, reccopy, recbuf_size) == 0);
+  gst_rtp_buffer_unmap (&rtp);
+  gst_rtp_buffer_unmap (&rtpout);
   fail_unless (!GST_BUFFER_FLAG_IS_SET (bufout, GST_RTP_BUFFER_FLAG_REDUNDANT));
   gst_buffer_unref (bufout);
+  g_free (reccopy);
 
   /* Pushing the next buffer with discont flag set */
-  bufout = gst_buffer_new ();
+  bufout = gst_rtp_buffer_new_allocate (0, 0, 0);
   GST_BUFFER_FLAG_SET (bufout, GST_BUFFER_FLAG_DISCONT);
   bufout = gst_harness_push_and_pull (h, bufout);
   /* Checking the flag was unset */