avtp: Add fragmented packets handling to CVF depayloader
authorEderson de Souza <ederson.desouza@intel.com>
Wed, 20 Mar 2019 23:40:13 +0000 (16:40 -0700)
committerEderson de Souza <ederson.desouza@intel.com>
Wed, 3 Jul 2019 16:59:35 +0000 (09:59 -0700)
This patch adds to the CVF depayloader the capability to regroup H.264
fragmented FU-A packets.

After all packets are regrouped, they are added to the "stash" of H.264
NAL units that will be sent as soon as an AVTP packet with M bit set is
found (usually, the last fragment).

Unrecognized fragments (such as first fragment seen, but with no Start
bit set) are discarded - and any NAL units on the "stash" are sent
downstream, as if a SEQNUM discontinuty happened.

ext/avtp/gstavtpcvfdepay.c
ext/avtp/gstavtpcvfdepay.h

index 05a6df0..4ca7572 100644 (file)
@@ -61,6 +61,12 @@ static GstFlowReturn gst_avtp_cvf_depay_chain (GstPad * pad, GstObject * parent,
 #define FU_A_TYPE   28
 #define FU_B_TYPE   29
 
+#define NRI_MASK        0x60
+#define NRI_SHIFT       5
+#define START_MASK      0x80
+#define START_SHIFT     7
+#define END_MASK        0x40
+#define END_SHIFT       6
 #define NAL_TYPE_MASK   0x1f
 
 /* pad templates */
@@ -100,6 +106,7 @@ static void
 gst_avtp_cvf_depay_init (GstAvtpCvfDepay * avtpcvfdepay)
 {
   avtpcvfdepay->out_buffer = NULL;
+  avtpcvfdepay->fragments = NULL;
   avtpcvfdepay->seqnum = 0;
 }
 
@@ -238,6 +245,13 @@ gst_avtp_cvf_depay_push_and_discard (GstAvtpCvfDepay * avtpcvfdepay)
 
     gst_avtp_cvf_depay_push (avtpcvfdepay);
   }
+
+  /* Discard any incomplete fragments */
+  if (avtpcvfdepay->fragments != NULL) {
+    GST_DEBUG_OBJECT (avtpcvfdepay, "Discarding incomplete fragments");
+    gst_buffer_unref (avtpcvfdepay->fragments);
+    avtpcvfdepay->fragments = NULL;
+  }
 }
 
 static gboolean
@@ -446,11 +460,138 @@ gst_avtp_cvf_depay_get_nalu_size (GstAvtpCvfDepay * avtpcvfdepay,
 }
 
 static GstFlowReturn
-gst_avtp_cvf_depay_handle_fragment_a (GstAvtpCvfDepay * avtpcvfdepay,
+gst_avtp_cvf_depay_process_last_fragment (GstAvtpCvfDepay * avtpcvfdepay,
+    GstBuffer * avtpdu, GstMapInfo * map, gsize offset, gsize nalu_size,
+    guint8 nri, guint8 nal_type)
+{
+  GstBuffer *nal;
+  GstMapInfo map_nal;
+  GstClockTime pts, dts;
+  gboolean M;
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  if (G_UNLIKELY (avtpcvfdepay->fragments == NULL)) {
+    GST_WARNING_OBJECT (avtpcvfdepay,
+        "Received final fragment, but no start fragment received. Dropping it.");
+    goto end;
+  }
+
+  gst_buffer_copy_into (avtpcvfdepay->fragments, avtpdu,
+      GST_BUFFER_COPY_MEMORY, offset, nalu_size);
+
+  /* Allocate buffer to keep the nal_header (1 byte) and the NALu size (4 bytes) */
+  nal = gst_buffer_new_allocate (NULL, 4 + 1, NULL);
+  if (G_UNLIKELY (nal == NULL)) {
+    GST_ERROR_OBJECT (avtpcvfdepay, "Could not allocate buffer");
+    ret = GST_FLOW_ERROR;
+    goto end;
+  }
+
+  gst_buffer_map (nal, &map_nal, GST_MAP_READWRITE);
+  /* Add NAL size. Extra 1 counts the nal_header */
+  nalu_size = gst_buffer_get_size (avtpcvfdepay->fragments) + 1;
+  map_nal.data[0] = nalu_size >> 24;
+  map_nal.data[1] = nalu_size >> 16;
+  map_nal.data[2] = nalu_size >> 8;
+  map_nal.data[3] = nalu_size;
+
+  /* Finally, add the nal_header */
+  map_nal.data[4] = (nri << 5) | nal_type;
+
+  gst_buffer_unmap (nal, &map_nal);
+
+  nal = gst_buffer_append (nal, avtpcvfdepay->fragments);
+
+  gst_avtp_cvf_depay_get_avtp_timestamps (avtpcvfdepay, map, &pts, &dts);
+  GST_BUFFER_PTS (nal) = pts;
+  GST_BUFFER_DTS (nal) = dts;
+
+  gst_avtp_cvf_depay_get_M (avtpcvfdepay, map, &M);
+  gst_avtp_cvf_depay_internal_push (avtpcvfdepay, nal, M);
+
+  avtpcvfdepay->fragments = NULL;
+
+end:
+  return ret;
+}
+
+static GstFlowReturn
+gst_avtp_cvf_depay_handle_fu_a (GstAvtpCvfDepay * avtpcvfdepay,
     GstBuffer * avtpdu, GstMapInfo * map)
 {
   GstFlowReturn ret = GST_FLOW_OK;
+  struct avtp_stream_pdu *pdu;
+  struct avtp_cvf_h264_payload *pay;
+  guint8 fu_header, fu_indicator, nal_type, start, end, nri;
+  guint16 nalu_size;
+  gsize offset;
+
+  if (G_UNLIKELY (map->size - AVTP_CVF_H264_HEADER_SIZE < 2)) {
+    GST_ERROR_OBJECT (avtpcvfdepay,
+        "Buffer too small to contain fragment headers, size: %lu",
+        map->size - AVTP_CVF_H264_HEADER_SIZE);
+    gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    goto end;
+  }
+
+  pdu = (struct avtp_stream_pdu *) map->data;
+  pay = (struct avtp_cvf_h264_payload *) pdu->avtp_payload;
+  fu_indicator = pay->h264_data[0];
+  nri = (fu_indicator & NRI_MASK) >> NRI_SHIFT;
+
+  GST_DEBUG_OBJECT (avtpcvfdepay, "Fragment indicator - NRI: %u", nri);
+
+  fu_header = pay->h264_data[1];
+  nal_type = fu_header & NAL_TYPE_MASK;
+  start = (fu_header & START_MASK) >> START_SHIFT;
+  end = (fu_header & END_MASK) >> END_SHIFT;
+
+  GST_DEBUG_OBJECT (avtpcvfdepay,
+      "Fragment header - type: %u start: %u end: %u", nal_type, start, end);
+
+  if (G_UNLIKELY (start && end)) {
+    GST_ERROR_OBJECT (avtpcvfdepay,
+        "Invalid fragment header - 'start' and 'end' bits set");
+    gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    goto end;
+  }
 
+  /* Size and offset also ignores the FU_HEADER and FU_INDICATOR fields,
+   * hence the "sizeof(guint8) * 2" */
+  offset = AVTP_CVF_H264_HEADER_SIZE + sizeof (guint8) * 2;
+  gst_avtp_cvf_depay_get_nalu_size (avtpcvfdepay, map, &nalu_size);
+  nalu_size -= sizeof (guint8) * 2;
+
+  if (start) {
+    if (G_UNLIKELY (avtpcvfdepay->fragments != NULL)) {
+      GST_WARNING_OBJECT (avtpcvfdepay,
+          "Received starting fragment, but previous one is not complete. Dropping old fragment");
+      gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    }
+
+    avtpcvfdepay->fragments =
+        gst_buffer_copy_region (avtpdu, GST_BUFFER_COPY_MEMORY, offset,
+        nalu_size);
+  }
+
+  if (!start && !end) {
+    if (G_UNLIKELY (avtpcvfdepay->fragments == NULL)) {
+      GST_WARNING_OBJECT (avtpcvfdepay,
+          "Received intermediate fragment, but no start fragment received. Dropping it.");
+      gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+      goto end;
+    }
+    gst_buffer_copy_into (avtpcvfdepay->fragments, avtpdu,
+        GST_BUFFER_COPY_MEMORY, offset, nalu_size);
+  }
+
+  if (end) {
+    ret =
+        gst_avtp_cvf_depay_process_last_fragment (avtpcvfdepay, avtpdu, map,
+        offset, nalu_size, nri, nal_type);
+  }
+
+end:
   return ret;
 }
 
@@ -466,6 +607,12 @@ gst_avtp_cvf_depay_handle_single_nal (GstAvtpCvfDepay * avtpcvfdepay,
 
   GST_DEBUG_OBJECT (avtpcvfdepay, "Handling single NAL unit");
 
+  if (avtpcvfdepay->fragments != NULL) {
+    GST_WARNING_OBJECT (avtpcvfdepay,
+        "Received single NAL unit, but previous fragment is incomplete. Dropping fragment.");
+    gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+  }
+
   gst_avtp_cvf_depay_get_avtp_timestamps (avtpcvfdepay, map, &pts, &dts);
   gst_avtp_cvf_depay_get_nalu_size (avtpcvfdepay, map, &nalu_size);
   gst_avtp_cvf_depay_get_M (avtpcvfdepay, map, &M);
@@ -513,7 +660,7 @@ gst_avtp_cvf_depay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
           "AVTP aggregation packets not supported, dropping it");
       break;
     case FU_A_TYPE:
-      ret = gst_avtp_cvf_depay_handle_fragment_a (avtpcvfdepay, buffer, &map);
+      ret = gst_avtp_cvf_depay_handle_fu_a (avtpcvfdepay, buffer, &map);
       break;
     case FU_B_TYPE:
       GST_WARNING_OBJECT (avtpcvfdepay,
index 682e188..db14190 100644 (file)
@@ -47,6 +47,7 @@ struct _GstAvtpCvfDepay
 
   guint8 seqnum;
   GstBuffer *out_buffer;
+  GstBuffer *fragments;
 };
 
 struct _GstAvtpCvfDepayClass