rtpbasedepayload: handle caps change partway through buffer list
authorJakub Adam <jakub.adam@collabora.com>
Tue, 9 Feb 2021 21:09:52 +0000 (22:09 +0100)
committerJakub Adam <jakub.adam@collabora.com>
Fri, 12 Mar 2021 17:45:04 +0000 (18:45 +0100)
While preparing a blist for pushing, some RTP header extension may
request caps change for a specific buffer in the list. When this
happens, depayloader should immediately push those buffers from the list
that precede the currently processed buffer (for which the caps change
was requested) and only then apply the new caps to the src pad.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/merge_requests/1011>

gst-libs/gst/rtp/gstrtpbasedepayload.c
tests/check/libs/rtpbasedepayload.c
tests/check/libs/rtpdummyhdrextimpl.c

index edb12a2..b1e12d9 100644 (file)
@@ -60,8 +60,6 @@ struct _GstRTPBaseDepayloadPrivate
   gboolean negotiated;
 
   GstCaps *last_caps;
-  gboolean needs_src_caps_update;
-
   GstEvent *segment_event;
   guint32 segment_seqnum;       /* Note: this is a GstEvent seqnum */
 
@@ -1141,7 +1139,7 @@ gst_rtp_base_depayload_clear_extensions (GstRTPBaseDepayload * rtpbasepayload)
   GST_OBJECT_UNLOCK (rtpbasepayload);
 }
 
-static void
+static gboolean
 read_rtp_header_extensions (GstRTPBaseDepayload * depayload,
     GstBuffer * input, GstBuffer * output)
 {
@@ -1149,15 +1147,16 @@ read_rtp_header_extensions (GstRTPBaseDepayload * depayload,
   guint16 bit_pattern;
   guint8 *pdata;
   guint wordlen;
+  gboolean needs_src_caps_update = FALSE;
 
   if (!input) {
     GST_DEBUG_OBJECT (depayload, "no input buffer");
-    return;
+    return needs_src_caps_update;
   }
 
   if (!gst_rtp_buffer_map (input, GST_MAP_READ, &rtp)) {
     GST_WARNING_OBJECT (depayload, "Failed to map buffer");
-    return;
+    return needs_src_caps_update;
   }
 
   if (gst_rtp_buffer_get_extension_data (&rtp, &bit_pattern, (gpointer) & pdata,
@@ -1244,7 +1243,7 @@ read_rtp_header_extensions (GstRTPBaseDepayload * depayload,
         }
 
         if (gst_rtp_header_extension_wants_update_non_rtp_src_caps (ext)) {
-          depayload->priv->needs_src_caps_update = TRUE;
+          needs_src_caps_update = TRUE;
         }
 
         gst_object_unref (ext);
@@ -1257,32 +1256,33 @@ read_rtp_header_extensions (GstRTPBaseDepayload * depayload,
 
 out:
   gst_rtp_buffer_unmap (&rtp);
+
+  return needs_src_caps_update;
 }
 
 static gboolean
-set_headers (GstBuffer ** buffer, guint idx, GstRTPBaseDepayload * depayload)
+gst_rtp_base_depayload_set_headers (GstRTPBaseDepayload * depayload,
+    GstBuffer * buffer)
 {
   GstRTPBaseDepayloadPrivate *priv = depayload->priv;
   GstClockTime pts, dts, duration;
 
-  *buffer = gst_buffer_make_writable (*buffer);
-
-  pts = GST_BUFFER_PTS (*buffer);
-  dts = GST_BUFFER_DTS (*buffer);
-  duration = GST_BUFFER_DURATION (*buffer);
+  pts = GST_BUFFER_PTS (buffer);
+  dts = GST_BUFFER_DTS (buffer);
+  duration = GST_BUFFER_DURATION (buffer);
 
   /* apply last incoming timestamp and duration to outgoing buffer if
    * not otherwise set. */
   if (!GST_CLOCK_TIME_IS_VALID (pts))
-    GST_BUFFER_PTS (*buffer) = priv->pts;
+    GST_BUFFER_PTS (buffer) = priv->pts;
   if (!GST_CLOCK_TIME_IS_VALID (dts))
-    GST_BUFFER_DTS (*buffer) = priv->dts;
+    GST_BUFFER_DTS (buffer) = priv->dts;
   if (!GST_CLOCK_TIME_IS_VALID (duration))
-    GST_BUFFER_DURATION (*buffer) = priv->duration;
+    GST_BUFFER_DURATION (buffer) = priv->duration;
 
   if (G_UNLIKELY (depayload->priv->discont)) {
     GST_LOG_OBJECT (depayload, "Marking DISCONT on output buffer");
-    GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
+    GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
     depayload->priv->discont = FALSE;
   }
 
@@ -1293,74 +1293,138 @@ set_headers (GstBuffer ** buffer, guint idx, GstRTPBaseDepayload * depayload)
 
   if (priv->input_buffer) {
     if (priv->source_info)
-      add_rtp_source_meta (*buffer, priv->input_buffer);
+      add_rtp_source_meta (buffer, priv->input_buffer);
 
-    read_rtp_header_extensions (depayload, priv->input_buffer, *buffer);
+    return read_rtp_header_extensions (depayload, priv->input_buffer, buffer);
   }
 
-  return TRUE;
+  return FALSE;
 }
 
 static GstFlowReturn
-gst_rtp_base_depayload_prepare_push (GstRTPBaseDepayload * filter,
+gst_rtp_base_depayload_finish_push (GstRTPBaseDepayload * filter,
     gboolean is_list, gpointer obj)
 {
+  /* if this is the first buffer send a NEWSEGMENT */
+  if (G_UNLIKELY (filter->priv->segment_event)) {
+    gst_pad_push_event (filter->srcpad, filter->priv->segment_event);
+    filter->priv->segment_event = NULL;
+    GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
+  }
+
   if (is_list) {
-    GstBufferList **blist = obj;
-    gst_buffer_list_foreach (*blist, (GstBufferListFunc) set_headers, filter);
+    GstBufferList *blist = obj;
+    return gst_pad_push_list (filter->srcpad, blist);
   } else {
-    GstBuffer **buf = obj;
-    set_headers (buf, 0, filter);
+    GstBuffer *buf = obj;
+    return gst_pad_push (filter->srcpad, buf);
   }
+}
 
-  /* header extensions may want to update src caps */
-  if (G_UNLIKELY (filter->priv->needs_src_caps_update)) {
-    GstCaps *src_caps = gst_pad_get_current_caps (filter->srcpad);
+static gboolean
+gst_rtp_base_depayload_set_src_caps_from_hdrext (GstRTPBaseDepayload * filter)
+{
+  gboolean update_ok = TRUE;
+  GstCaps *src_caps = gst_pad_get_current_caps (filter->srcpad);
 
-    if (src_caps) {
-      GstCaps *new_caps;
-      gboolean update_ok = TRUE;
-      gint i;
+  if (src_caps) {
+    GstCaps *new_caps;
+    gint i;
 
-      new_caps = gst_caps_copy (src_caps);
-      for (i = 0; i < filter->priv->header_exts->len; i++) {
-        GstRTPHeaderExtension *ext;
+    new_caps = gst_caps_copy (src_caps);
+    for (i = 0; i < filter->priv->header_exts->len; i++) {
+      GstRTPHeaderExtension *ext;
 
-        ext = g_ptr_array_index (filter->priv->header_exts, i);
-        update_ok =
-            gst_rtp_header_extension_update_non_rtp_src_caps (ext, new_caps);
+      ext = g_ptr_array_index (filter->priv->header_exts, i);
+      update_ok =
+          gst_rtp_header_extension_update_non_rtp_src_caps (ext, new_caps);
 
-        if (!update_ok) {
-          GST_ELEMENT_ERROR (filter, STREAM, DECODE,
-              ("RTP header extension (%s) could not update src caps",
-                  GST_OBJECT_NAME (ext)), (NULL));
-          break;
-        }
+      if (!update_ok) {
+        GST_ELEMENT_ERROR (filter, STREAM, DECODE,
+            ("RTP header extension (%s) could not update src caps",
+                GST_OBJECT_NAME (ext)), (NULL));
+        break;
       }
+    }
+
+    if (G_UNLIKELY (update_ok && !gst_caps_is_equal (src_caps, new_caps))) {
+      gst_pad_set_caps (filter->srcpad, new_caps);
+    }
 
-      if (G_UNLIKELY (update_ok && !gst_caps_is_equal (src_caps, new_caps))) {
-        gst_pad_set_caps (filter->srcpad, new_caps);
+    gst_caps_unref (src_caps);
+    gst_caps_unref (new_caps);
+  }
+
+  return update_ok;
+}
+
+static GstFlowReturn
+gst_rtp_base_depayload_do_push (GstRTPBaseDepayload * filter, gboolean is_list,
+    gpointer obj)
+{
+  GstFlowReturn res;
+
+  if (is_list) {
+    GstBufferList *blist = obj;
+    guint i;
+    guint first_not_pushed_idx = 0;
+
+    for (i = 0; i < gst_buffer_list_length (blist); ++i) {
+      GstBuffer *buf = gst_buffer_list_get_writable (blist, i);
+
+      if (G_UNLIKELY (gst_rtp_base_depayload_set_headers (filter, buf))) {
+        /* src caps have changed; push the buffers preceding the current one,
+         * then apply the new caps on the src pad */
+        guint j;
+
+        for (j = first_not_pushed_idx; j < i; ++j) {
+          res = gst_rtp_base_depayload_finish_push (filter, FALSE,
+              gst_buffer_ref (gst_buffer_list_get (blist, j)));
+          if (G_UNLIKELY (res != GST_FLOW_OK)) {
+            goto error_list;
+          }
+        }
+        first_not_pushed_idx = i;
+
+        if (!gst_rtp_base_depayload_set_src_caps_from_hdrext (filter)) {
+          res = GST_FLOW_ERROR;
+          goto error_list;
+        }
       }
+    }
 
-      gst_caps_unref (src_caps);
-      gst_caps_unref (new_caps);
+    if (G_LIKELY (first_not_pushed_idx == 0)) {
+      res = gst_rtp_base_depayload_finish_push (filter, TRUE, blist);
+      blist = NULL;
+    } else {
+      for (i = first_not_pushed_idx; i < gst_buffer_list_length (blist); ++i) {
+        res = gst_rtp_base_depayload_finish_push (filter, FALSE,
+            gst_buffer_ref (gst_buffer_list_get (blist, i)));
+        if (G_UNLIKELY (res != GST_FLOW_OK)) {
+          break;
+        }
+      }
+    }
 
-      if (!update_ok) {
-        return GST_FLOW_ERROR;
+  error_list:
+    gst_clear_buffer_list (&blist);
+  } else {
+    GstBuffer *buf = obj;
+    if (G_UNLIKELY (gst_rtp_base_depayload_set_headers (filter, buf))) {
+      if (!gst_rtp_base_depayload_set_src_caps_from_hdrext (filter)) {
+        res = GST_FLOW_ERROR;
+        goto error_buffer;
       }
     }
 
-    filter->priv->needs_src_caps_update = FALSE;
-  }
+    res = gst_rtp_base_depayload_finish_push (filter, FALSE, buf);
+    buf = NULL;
 
-  /* if this is the first buffer send a NEWSEGMENT */
-  if (G_UNLIKELY (filter->priv->segment_event)) {
-    gst_pad_push_event (filter->srcpad, filter->priv->segment_event);
-    filter->priv->segment_event = NULL;
-    GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
+  error_buffer:
+    gst_clear_buffer (&buf);
   }
 
-  return GST_FLOW_OK;
+  return res;
 }
 
 /**
@@ -1381,12 +1445,7 @@ gst_rtp_base_depayload_push (GstRTPBaseDepayload * filter, GstBuffer * out_buf)
 {
   GstFlowReturn res;
 
-  res = gst_rtp_base_depayload_prepare_push (filter, FALSE, &out_buf);
-
-  if (G_LIKELY (res == GST_FLOW_OK))
-    res = gst_pad_push (filter->srcpad, out_buf);
-  else
-    gst_buffer_unref (out_buf);
+  res = gst_rtp_base_depayload_do_push (filter, FALSE, out_buf);
 
   if (res != GST_FLOW_OK)
     filter->priv->process_flow_ret = res;
@@ -1410,12 +1469,7 @@ gst_rtp_base_depayload_push_list (GstRTPBaseDepayload * filter,
 {
   GstFlowReturn res;
 
-  res = gst_rtp_base_depayload_prepare_push (filter, TRUE, &out_list);
-
-  if (G_LIKELY (res == GST_FLOW_OK))
-    res = gst_pad_push_list (filter->srcpad, out_list);
-  else
-    gst_buffer_list_unref (out_list);
+  res = gst_rtp_base_depayload_do_push (filter, TRUE, out_list);
 
   if (res != GST_FLOW_OK)
     filter->priv->process_flow_ret = res;
index b5f566d..464b95f 100644 (file)
@@ -59,6 +59,7 @@ struct _GstRtpDummyDepay
   guint64 rtptime;
 
   GstRtpDummyPushMethod push_method;
+  guint num_buffers_in_blist;
 };
 
 struct _GstRtpDummyDepayClass
@@ -110,6 +111,7 @@ static void
 gst_rtp_dummy_depay_init (GstRtpDummyDepay * depay)
 {
   depay->rtptime = 0;
+  depay->num_buffers_in_blist = 1;
 }
 
 static GstRtpDummyDepay *
@@ -169,7 +171,11 @@ gst_rtp_dummy_depay_process (GstRTPBaseDepayload * depayload, GstBuffer * buf)
       break;
     case GST_RTP_DUMMY_USE_PUSH_LIST_FUNC:{
       GstBufferList *blist = gst_buffer_list_new ();
+      gint i;
       gst_buffer_list_add (blist, outbuf);
+      for (i = 0; i != self->num_buffers_in_blist - 1; ++i) {
+        gst_buffer_list_add (blist, gst_buffer_copy (outbuf));
+      }
       outbuf = NULL;
       gst_rtp_base_depayload_push_list (depayload, blist);
       break;
@@ -1807,6 +1813,70 @@ GST_START_TEST (rtp_base_depayload_caps_request_ignored)
 
 GST_END_TEST;
 
+static GstFlowReturn
+hdr_ext_caps_change_chain_func (GstPad * pad, GstObject * parent,
+    GstBuffer * buffer)
+{
+  GstFlowReturn res;
+  GstCaps *caps;
+  guint val;
+  static guint expected_caps_val = 0;
+
+  res = gst_check_chain_func (pad, parent, buffer);
+  if (res != GST_FLOW_OK) {
+    return res;
+  }
+
+  caps = gst_pad_get_current_caps (pad);
+
+  fail_unless (gst_structure_get_uint (gst_caps_get_structure (caps, 0),
+          "dummy-hdrext-val", &val));
+
+  /* Every fifth buffer increments "dummy-hdrext-val". */
+  if (g_list_length (buffers) % 5 == 1) {
+    expected_caps_val++;
+  }
+
+  fail_unless_equals_int (expected_caps_val, val);
+
+  gst_caps_unref (caps);
+
+  return res;
+}
+
+GST_START_TEST (rtp_base_depayload_hdr_ext_caps_change)
+{
+  GstRTPHeaderExtension *ext;
+  State *state;
+
+  state = create_depayloader ("application/x-rtp", NULL);
+  gst_pad_set_chain_function (state->sinkpad, hdr_ext_caps_change_chain_func);
+
+  ext = rtp_dummy_hdr_ext_new ();
+  gst_rtp_header_extension_set_id (ext, 1);
+
+  GST_RTP_DUMMY_DEPAY (state->element)->push_method =
+      GST_RTP_DUMMY_USE_PUSH_LIST_FUNC;
+  GST_RTP_DUMMY_DEPAY (state->element)->num_buffers_in_blist = 15;
+
+  g_signal_emit_by_name (state->element, "add-extension", ext);
+
+  set_state (state, GST_STATE_PLAYING);
+
+  push_rtp_buffer (state, "pts", 0 * GST_SECOND,
+      "rtptime", G_GUINT64_CONSTANT (0x1234), "seq", 0x4242, "hdrext-1", ext,
+      NULL);
+
+  set_state (state, GST_STATE_NULL);
+
+  validate_buffers_received (15);
+
+  gst_object_unref (ext);
+  destroy_depayloader (state);
+}
+
+GST_END_TEST;
+
 static Suite *
 rtp_basepayloading_suite (void)
 {
@@ -1850,6 +1920,7 @@ rtp_basepayloading_suite (void)
   tcase_add_test (tc_chain, rtp_base_depayload_clear_extensions);
   tcase_add_test (tc_chain, rtp_base_depayload_multiple_exts);
   tcase_add_test (tc_chain, rtp_base_depayload_caps_request_ignored);
+  tcase_add_test (tc_chain, rtp_base_depayload_hdr_ext_caps_change);
 
   return s;
 }
index 379bfe1..e5d09af 100644 (file)
@@ -54,6 +54,7 @@ struct _GstRTPDummyHdrExt
   guint read_count;
   guint write_count;
   guint set_attributes_count;
+  guint caps_field_value;
 
   gchar *direction;
   gchar *attributes;
@@ -85,6 +86,9 @@ gst_rtp_dummy_hdr_ext_set_caps_from_attributes (GstRTPHeaderExtension * ext,
 static gboolean
 gst_rtp_dummy_hdr_ext_set_attributes_from_caps (GstRTPHeaderExtension * ext,
     const GstCaps * caps);
+static gboolean
+gst_rtp_dummy_hdr_ext_update_non_rtp_src_caps (GstRTPHeaderExtension * ext,
+    GstCaps * caps);
 
 static void gst_rtp_dummy_hdr_ext_finalize (GObject * object);
 
@@ -109,6 +113,8 @@ gst_rtp_dummy_hdr_ext_class_init (GstRTPDummyHdrExtClass * klass)
       gst_rtp_dummy_hdr_ext_set_attributes_from_caps;
   gstrtpheaderextension_class->set_caps_from_attributes =
       gst_rtp_dummy_hdr_ext_set_caps_from_attributes;
+  gstrtpheaderextension_class->update_non_rtp_src_caps =
+      gst_rtp_dummy_hdr_ext_update_non_rtp_src_caps;
 
   gobject_class->finalize = gst_rtp_dummy_hdr_ext_finalize;
 
@@ -189,6 +195,11 @@ gst_rtp_dummy_hdr_ext_read (GstRTPHeaderExtension * ext,
 
   dummy->read_count++;
 
+  if (dummy->read_count % 5 == 1) {
+    /* Every fifth buffer triggers caps change. */
+    gst_rtp_header_extension_set_wants_update_non_rtp_src_caps (ext, TRUE);
+  }
+
   return TRUE;
 }
 
@@ -296,3 +307,15 @@ error:
   g_free (new_attrs);
   return FALSE;
 }
+
+static gboolean
+gst_rtp_dummy_hdr_ext_update_non_rtp_src_caps (GstRTPHeaderExtension * ext,
+    GstCaps * caps)
+{
+  GstRTPDummyHdrExt *dummy = GST_RTP_DUMMY_HDR_EXT (ext);
+
+  gst_caps_set_simple (caps, "dummy-hdrext-val", G_TYPE_UINT,
+      ++dummy->caps_field_value, NULL);
+
+  return TRUE;
+}