avtpcvfdepay: Don't hide gst_pad_push return
authorEderson de Souza <ederson.desouza@intel.com>
Fri, 1 Nov 2019 22:58:47 +0000 (15:58 -0700)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Tue, 19 Nov 2019 13:35:00 +0000 (13:35 +0000)
avtpcvfdepay was effectively hiding any return from gst_pad_push, so no
errors or GST_FLOW_EOS would be propagated upstream.

Tests also added.

ext/avtp/gstavtpcvfdepay.c
tests/check/elements/avtpcvfdepay.c

index 03369ca..6bb9650 100644 (file)
@@ -145,11 +145,12 @@ gst_avtp_cvf_depay_push_caps (GstAvtpCvfDepay * avtpcvfdepay)
   return gst_pad_push_event (avtpbasedepayload->srcpad, event);
 }
 
-static void
+static GstFlowReturn
 gst_avtp_cvf_depay_push (GstAvtpCvfDepay * avtpcvfdepay)
 {
   GstAvtpBaseDepayload *avtpbasedepayload =
       GST_AVTP_BASE_DEPAYLOAD (avtpcvfdepay);
+  GstFlowReturn ret;
 
   if (G_UNLIKELY (!gst_pad_has_current_caps (avtpbasedepayload->srcpad))) {
     guint64 pts_m;
@@ -169,7 +170,7 @@ gst_avtp_cvf_depay_push (GstAvtpCvfDepay * avtpcvfdepay)
 
     if (!gst_avtp_cvf_depay_push_caps (avtpcvfdepay)) {
       GST_ELEMENT_ERROR (avtpcvfdepay, CORE, CAPS, (NULL), (NULL));
-      return;
+      return GST_FLOW_ERROR;
     }
 
     if (!gst_avtp_base_depayload_push_segment_event (avtpbasedepayload,
@@ -231,19 +232,23 @@ gst_avtp_cvf_depay_push (GstAvtpCvfDepay * avtpcvfdepay)
    * next calculations of PTS/DTS won't wrap too early */
   avtpbasedepayload->prev_ptime = GST_BUFFER_DTS (avtpcvfdepay->out_buffer);
 
-  gst_pad_push (GST_AVTP_BASE_DEPAYLOAD (avtpcvfdepay)->srcpad,
+  ret = gst_pad_push (GST_AVTP_BASE_DEPAYLOAD (avtpcvfdepay)->srcpad,
       avtpcvfdepay->out_buffer);
   avtpcvfdepay->out_buffer = NULL;
+
+  return ret;
 }
 
-static void
+static GstFlowReturn
 gst_avtp_cvf_depay_push_and_discard (GstAvtpCvfDepay * avtpcvfdepay)
 {
+  GstFlowReturn ret = GST_FLOW_OK;
+
   /* Push everything we have, hopefully decoder can handle it */
   if (avtpcvfdepay->out_buffer != NULL) {
     GST_DEBUG_OBJECT (avtpcvfdepay, "Pushing incomplete buffers");
 
-    gst_avtp_cvf_depay_push (avtpcvfdepay);
+    ret = gst_avtp_cvf_depay_push (avtpcvfdepay);
   }
 
   /* Discard any incomplete fragments */
@@ -252,11 +257,13 @@ gst_avtp_cvf_depay_push_and_discard (GstAvtpCvfDepay * avtpcvfdepay)
     gst_buffer_unref (avtpcvfdepay->fragments);
     avtpcvfdepay->fragments = NULL;
   }
+
+  return ret;
 }
 
 static gboolean
 gst_avtp_cvf_depay_validate_avtpdu (GstAvtpCvfDepay * avtpcvfdepay,
-    GstMapInfo * map)
+    GstMapInfo * map, gboolean * lost_packet)
 {
   GstAvtpBaseDepayload *avtpbasedepayload =
       GST_AVTP_BASE_DEPAYLOAD (avtpcvfdepay);
@@ -335,6 +342,7 @@ gst_avtp_cvf_depay_validate_avtpdu (GstAvtpCvfDepay * avtpcvfdepay,
     goto end;
   }
 
+  *lost_packet = FALSE;
   r = avtp_cvf_pdu_get (pdu, AVTP_CVF_FIELD_SEQ_NUM, &val);
   g_assert (r == 0);
   if (G_UNLIKELY (val != avtpcvfdepay->seqnum)) {
@@ -345,7 +353,7 @@ gst_avtp_cvf_depay_validate_avtpdu (GstAvtpCvfDepay * avtpcvfdepay,
     avtpcvfdepay->seqnum = val;
     /* This is not a reason to drop the packet, but it may be a good moment
      * to push everything we have - maybe we lost the M packet? */
-    gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    *lost_packet = TRUE;
   }
   avtpcvfdepay->seqnum++;
 
@@ -404,10 +412,12 @@ gst_avtp_cvf_depay_get_avtp_timestamps (GstAvtpCvfDepay * avtpcvfdepay,
   }
 }
 
-static void
+static GstFlowReturn
 gst_avtp_cvf_depay_internal_push (GstAvtpCvfDepay * avtpcvfdepay,
     GstBuffer * buffer, gboolean M)
 {
+  GstFlowReturn ret = GST_FLOW_OK;
+
   GST_LOG_OBJECT (avtpcvfdepay,
       "Adding buffer of size %lu (nalu size %lu) to out_buffer",
       gst_buffer_get_size (buffer),
@@ -422,8 +432,10 @@ gst_avtp_cvf_depay_internal_push (GstAvtpCvfDepay * avtpcvfdepay,
 
   /* We only truly push to decoder when we get the last video buffer */
   if (M) {
-    gst_avtp_cvf_depay_push (avtpcvfdepay);
+    ret = gst_avtp_cvf_depay_push (avtpcvfdepay);
   }
+
+  return ret;
 }
 
 static void
@@ -507,7 +519,7 @@ gst_avtp_cvf_depay_process_last_fragment (GstAvtpCvfDepay * avtpcvfdepay,
   GST_BUFFER_DTS (nal) = dts;
 
   gst_avtp_cvf_depay_get_M (avtpcvfdepay, map, &M);
-  gst_avtp_cvf_depay_internal_push (avtpcvfdepay, nal, M);
+  ret = gst_avtp_cvf_depay_internal_push (avtpcvfdepay, nal, M);
 
   avtpcvfdepay->fragments = NULL;
 
@@ -530,7 +542,7 @@ gst_avtp_cvf_depay_handle_fu_a (GstAvtpCvfDepay * avtpcvfdepay,
     GST_ERROR_OBJECT (avtpcvfdepay,
         "Buffer too small to contain fragment headers, size: %lu",
         map->size - AVTP_CVF_H264_HEADER_SIZE);
-    gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    ret = gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
     goto end;
   }
 
@@ -552,7 +564,7 @@ gst_avtp_cvf_depay_handle_fu_a (GstAvtpCvfDepay * avtpcvfdepay,
   if (G_UNLIKELY (start && end)) {
     GST_ERROR_OBJECT (avtpcvfdepay,
         "Invalid fragment header - 'start' and 'end' bits set");
-    gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    ret = gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
     goto end;
   }
 
@@ -566,7 +578,9 @@ gst_avtp_cvf_depay_handle_fu_a (GstAvtpCvfDepay * avtpcvfdepay,
     if (G_UNLIKELY (avtpcvfdepay->fragments != NULL)) {
       GST_DEBUG_OBJECT (avtpcvfdepay,
           "Received starting fragment, but previous one is not complete. Dropping old fragment");
-      gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+      ret = gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+      if (ret != GST_FLOW_OK)
+        goto end;
     }
 
     avtpcvfdepay->fragments =
@@ -578,7 +592,7 @@ gst_avtp_cvf_depay_handle_fu_a (GstAvtpCvfDepay * avtpcvfdepay,
     if (G_UNLIKELY (avtpcvfdepay->fragments == NULL)) {
       GST_DEBUG_OBJECT (avtpcvfdepay,
           "Received intermediate fragment, but no start fragment received. Dropping it.");
-      gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+      ret = gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
       goto end;
     }
     gst_buffer_copy_into (avtpcvfdepay->fragments, avtpdu,
@@ -608,9 +622,13 @@ gst_avtp_cvf_depay_handle_single_nal (GstAvtpCvfDepay * avtpcvfdepay,
   GST_DEBUG_OBJECT (avtpcvfdepay, "Handling single NAL unit");
 
   if (avtpcvfdepay->fragments != NULL) {
+    GstFlowReturn ret;
+
     GST_DEBUG_OBJECT (avtpcvfdepay,
         "Received single NAL unit, but previous fragment is incomplete. Dropping fragment.");
-    gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    ret = gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    if (ret != GST_FLOW_OK)
+      return ret;
   }
 
   gst_avtp_cvf_depay_get_avtp_timestamps (avtpcvfdepay, map, &pts, &dts);
@@ -632,9 +650,7 @@ gst_avtp_cvf_depay_handle_single_nal (GstAvtpCvfDepay * avtpcvfdepay,
   GST_BUFFER_PTS (nal) = pts;
   GST_BUFFER_DTS (nal) = dts;
 
-  gst_avtp_cvf_depay_internal_push (avtpcvfdepay, nal, M);
-
-  return GST_FLOW_OK;
+  return gst_avtp_cvf_depay_internal_push (avtpcvfdepay, nal, M);
 }
 
 static GstFlowReturn
@@ -642,14 +658,20 @@ gst_avtp_cvf_depay_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
 {
   GstAvtpCvfDepay *avtpcvfdepay = GST_AVTP_CVF_DEPAY (parent);
   GstFlowReturn ret = GST_FLOW_OK;
+  gboolean lost_packet;
   GstMapInfo map;
 
   gst_buffer_map (buffer, &map, GST_MAP_READ);
 
-  if (!gst_avtp_cvf_depay_validate_avtpdu (avtpcvfdepay, &map)) {
+  if (!gst_avtp_cvf_depay_validate_avtpdu (avtpcvfdepay, &map, &lost_packet)) {
     GST_DEBUG_OBJECT (avtpcvfdepay, "Invalid AVTPDU buffer, dropping it");
     goto end;
   }
+  if (lost_packet) {
+    ret = gst_avtp_cvf_depay_push_and_discard (avtpcvfdepay);
+    if (ret != GST_FLOW_OK)
+      goto end;
+  }
 
   switch (gst_avtp_cvf_depay_get_nal_type (&map)) {
     case STAP_A_TYPE:
index 54e049f..c77f1c9 100644 (file)
@@ -96,6 +96,277 @@ fetch_nal (GstBuffer * buffer, gsize * offset)
   return ret;
 }
 
+GST_START_TEST (test_depayloader_fragment_and_single)
+{
+  GstHarness *h;
+  GstBuffer *in;
+  const gint DATA_LEN = sizeof (guint32) + 10;
+  struct avtp_stream_pdu *pdu;
+  GstMapInfo map;
+
+  /* Create the harness for the avtpcvfpay */
+  h = gst_harness_new_parse ("avtpcvfdepay ! fakesink num-buffers=1");
+  gst_harness_set_src_caps_str (h, "application/x-avtp");
+
+  /* Create the input AVTPDU */
+  in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+
+  /* Start with a single NAL */
+  avtp_cvf_pdu_init (pdu, AVTP_CVF_FORMAT_SUBTYPE_H264);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_ID, STREAM_ID);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_PTV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, 2000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_DATA_LEN, DATA_LEN);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 0x1;
+  gst_buffer_unmap (in, &map);
+
+  /* We push a copy so that we can change only what is necessary on our buffer */
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_OK);
+  fail_unless (gst_harness_try_pull (h) == NULL);
+
+  /* Then a fragment */
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_SEQ_NUM, 1);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 3 << 5 | 28;    /* NAL type FU-A, NRI 3 */
+  map.data[AVTP_CVF_H264_HEADER_SIZE + 1] = (1 << 7) | 4;       /* S = 1, type 4 */
+  gst_buffer_unmap (in, &map);
+
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_OK);
+
+  /* Third and last AVTPDU, again a single NAL */
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 1000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_SEQ_NUM, 2);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 0x1;
+  gst_buffer_unmap (in, &map);
+
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_EOS);
+
+  gst_buffer_unref (in);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_depayloader_fragmented_two_start_eos)
+{
+  GstHarness *h;
+  GstBuffer *in;
+  const gint DATA_LEN = sizeof (guint32) + 10;
+  struct avtp_stream_pdu *pdu;
+  GstMapInfo map;
+
+  /* Create the harness for the avtpcvfpay */
+  h = gst_harness_new_parse ("avtpcvfdepay ! fakesink num-buffers=1");
+  gst_harness_set_src_caps_str (h, "application/x-avtp");
+
+  /* Create the input AVTPDU */
+  in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+
+  /* Start with a single NAL */
+  avtp_cvf_pdu_init (pdu, AVTP_CVF_FORMAT_SUBTYPE_H264);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_ID, STREAM_ID);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_PTV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, 2000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_DATA_LEN, DATA_LEN);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 0x1;
+  gst_buffer_unmap (in, &map);
+
+  /* We push a copy so that we can change only what is necessary on our buffer */
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_OK);
+  fail_unless (gst_harness_try_pull (h) == NULL);
+
+  /* Then a fragment */
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_SEQ_NUM, 1);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 3 << 5 | 28;    /* NAL type FU-A, NRI 3 */
+  map.data[AVTP_CVF_H264_HEADER_SIZE + 1] = (1 << 7) | 4;       /* S = 1, type 4 */
+  gst_buffer_unmap (in, &map);
+
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_OK);
+
+  /* Third and last AVTPDU, another fragment with start bit set */
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 1000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_SEQ_NUM, 2);
+  map.data[AVTP_CVF_H264_HEADER_SIZE + 1] = (1 << 7) | 4;       /* S = 1, type 4 */
+  fill_nal (&map.data[AVTP_CVF_H264_HEADER_SIZE + 2], 8, 16);
+  gst_buffer_unmap (in, &map);
+
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_EOS);
+
+  gst_buffer_unref (in);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_depayloader_multiple_lost_eos)
+{
+  GstHarness *h;
+  GstBuffer *in;
+  const gint DATA_LEN = sizeof (guint32) + 4;
+  struct avtp_stream_pdu *pdu;
+  GstMapInfo map;
+
+  /* Create the harness for the avtpcvfpay */
+  h = gst_harness_new_parse ("avtpcvfdepay ! fakesink num-buffers=1");
+  gst_harness_set_src_caps_str (h, "application/x-avtp");
+
+  /* Create the input AVTPDU header */
+  in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+
+  avtp_cvf_pdu_init (pdu, AVTP_CVF_FORMAT_SUBTYPE_H264);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_ID, STREAM_ID);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 1000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_PTV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, 2000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_DATA_LEN, DATA_LEN);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 0x7;    /* Add NAL type */
+  fill_nal (&map.data[AVTP_CVF_H264_HEADER_SIZE + 1], 3, 0);
+  gst_buffer_unmap (in, &map);
+
+  /* We push a copy so that we can change only what is necessary on our buffer */
+  gst_harness_push (h, gst_buffer_copy (in));
+  fail_unless (gst_harness_try_pull (h) == NULL);
+
+  /* Send second AVTPDU, but skipping one seqnum */
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_SEQ_NUM, 2);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 0x1;    /* Add NAL type */
+  fill_nal (&map.data[AVTP_CVF_H264_HEADER_SIZE + 1], 3, 0);
+  gst_buffer_unmap (in, &map);
+
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_EOS);
+
+  gst_buffer_unref (in);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_depayloader_fragmented_eos)
+{
+  GstHarness *h;
+  GstBuffer *in;
+  const gint DATA_LEN = sizeof (guint32) + 10;
+  struct avtp_stream_pdu *pdu;
+  GstMapInfo map;
+
+  /* Create the harness for the avtpcvfpay */
+  h = gst_harness_new_parse ("avtpcvfdepay ! fakesink num-buffers=1");
+  gst_harness_set_src_caps_str (h, "application/x-avtp");
+
+  /* Create the input AVTPDU */
+  in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 10);
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+
+  avtp_cvf_pdu_init (pdu, AVTP_CVF_FORMAT_SUBTYPE_H264);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_ID, STREAM_ID);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 0);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_PTV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, 2000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_DATA_LEN, DATA_LEN);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 3 << 5 | 28;    /* NAL type FU-A, NRI 3 */
+  map.data[AVTP_CVF_H264_HEADER_SIZE + 1] = (1 << 7) | 4;       /* S = 1, type 4 */
+  fill_nal (&map.data[AVTP_CVF_H264_HEADER_SIZE + 2], 8, 0);
+  gst_buffer_unmap (in, &map);
+
+  /* We push a copy so that we can change only what is necessary on our buffer */
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_OK);
+  fail_unless (gst_harness_try_pull (h) == NULL);
+
+  /* Send second and last AVTPDU */
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 1000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_SEQ_NUM, 1);
+  map.data[AVTP_CVF_H264_HEADER_SIZE + 1] = (1 << 6) | 4;       /* E = 1, type 4 */
+  fill_nal (&map.data[AVTP_CVF_H264_HEADER_SIZE + 2], 8, 16);
+  gst_buffer_unmap (in, &map);
+
+  fail_unless_equals_int (gst_harness_push (h, gst_buffer_copy (in)),
+      GST_FLOW_EOS);
+
+  gst_buffer_unref (in);
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
+/* Tests a big fragmented NAL scenario */
+GST_START_TEST (test_depayloader_single_eos)
+{
+  GstHarness *h;
+  GstBuffer *in;
+  const gint DATA_LEN = sizeof (guint32) + 4;
+  struct avtp_stream_pdu *pdu;
+  GstMapInfo map;
+
+  /* Create the harness for the avtpcvfpay */
+  h = gst_harness_new_parse ("avtpcvfdepay ! fakesink num-buffers=1");
+  gst_harness_set_src_caps_str (h, "application/x-avtp");
+
+  /* Create the input AVTPDU header */
+  in = gst_harness_create_buffer (h, AVTP_CVF_H264_HEADER_SIZE + 4);
+  gst_buffer_map (in, &map, GST_MAP_READWRITE);
+  pdu = (struct avtp_stream_pdu *) map.data;
+
+  avtp_cvf_pdu_init (pdu, AVTP_CVF_FORMAT_SUBTYPE_H264);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_ID, STREAM_ID);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_M, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_TIMESTAMP, 1000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_PTV, 1);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_H264_TIMESTAMP, 2000000);
+  avtp_cvf_pdu_set (pdu, AVTP_CVF_FIELD_STREAM_DATA_LEN, DATA_LEN);
+  map.data[AVTP_CVF_H264_HEADER_SIZE] = 0x1;    /* Add NAL type */
+  fill_nal (&map.data[AVTP_CVF_H264_HEADER_SIZE + 1], 3, 0);
+  gst_buffer_unmap (in, &map);
+
+  fail_unless_equals_int (gst_harness_push (h, in), GST_FLOW_EOS);
+
+  gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
 GST_START_TEST (test_depayloader_invalid_avtpdu)
 {
   GstHarness *h;
@@ -1198,6 +1469,11 @@ avtpcvfdepay_suite (void)
   tcase_add_test (tc_chain, test_depayloader_single_and_messed_fragments_2);
   tcase_add_test (tc_chain, test_depayloader_single_and_messed_fragments_3);
   tcase_add_test (tc_chain, test_depayloader_invalid_avtpdu);
+  tcase_add_test (tc_chain, test_depayloader_single_eos);
+  tcase_add_test (tc_chain, test_depayloader_fragmented_eos);
+  tcase_add_test (tc_chain, test_depayloader_fragmented_two_start_eos);
+  tcase_add_test (tc_chain, test_depayloader_multiple_lost_eos);
+  tcase_add_test (tc_chain, test_depayloader_fragment_and_single);
 
   return s;
 }