rtmp2: Add support for AGGREGATE messages
authorJan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>
Mon, 29 Jun 2020 17:47:16 +0000 (19:47 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 1 Jul 2020 18:33:42 +0000 (18:33 +0000)
They're multiple frames (tags) of FLV data wrapped into a message.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1384>

gst/rtmp2/rtmp/rtmpconnection.c

index df56b15..b9fe767 100644 (file)
@@ -105,6 +105,8 @@ static void gst_rtmp_connection_start_read (GstRtmpConnection * sc,
     guint needed_bytes);
 static void gst_rtmp_connection_try_read (GstRtmpConnection * sc);
 static void gst_rtmp_connection_do_read (GstRtmpConnection * sc);
+static void gst_rtmp_connection_handle_aggregate (GstRtmpConnection *
+    connection, GstBuffer * buffer);
 static void gst_rtmp_connection_handle_protocol_control (GstRtmpConnection *
     connection, GstBuffer * buffer);
 static void gst_rtmp_connection_handle_cm (GstRtmpConnection * connection,
@@ -699,6 +701,10 @@ gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
       gst_rtmp_connection_handle_cm (sc, buffer);
       return;
 
+    case GST_RTMP_MESSAGE_TYPE_AGGREGATE:
+      gst_rtmp_connection_handle_aggregate (sc, buffer);
+      break;
+
     default:
       if (sc->input_handler) {
         sc->input_handler (sc, buffer, sc->input_handler_user_data);
@@ -708,6 +714,83 @@ gst_rtmp_connection_handle_message (GstRtmpConnection * sc, GstBuffer * buffer)
 }
 
 static void
+gst_rtmp_connection_handle_aggregate (GstRtmpConnection * connection,
+    GstBuffer * buffer)
+{
+  GstRtmpMeta *meta;
+  GstMapInfo map;
+  gsize pos = 0;
+  guint32 first_ts = 0;
+
+  meta = gst_buffer_get_rtmp_meta (buffer);
+  g_return_if_fail (meta);
+
+  gst_buffer_map (buffer, &map, GST_MAP_READ);
+  GST_TRACE_OBJECT (connection, "got aggregate message");
+
+  /* Parse Aggregate Messages as described in rtmp_specification_1.0.pdf page 26
+   * The payload is part of a FLV file.
+   *
+   * WARNING: This spec defines the payload to use an "RTMP message format"
+   * which misidentifies the format of the timestamps and omits the size of the
+   * backpointers. */
+
+  while (pos < map.size) {
+    gsize remaining = map.size - pos;
+    GstBuffer *submessage;
+    GstRtmpMeta *submeta;
+    GstRtmpFlvTagHeader header;
+
+    if (!gst_rtmp_flv_tag_parse_header (&header, map.data + pos, remaining)) {
+      GST_ERROR_OBJECT (connection,
+          "aggregate contains incomplete header; want %d, got %" G_GSIZE_FORMAT,
+          GST_RTMP_FLV_TAG_HEADER_SIZE, remaining);
+      break;
+    }
+
+    if (remaining < header.total_size) {
+      GST_ERROR_OBJECT (connection,
+          "aggregate contains incomplete message; want %" G_GSIZE_FORMAT
+          ", got %" G_GSIZE_FORMAT, header.total_size, remaining);
+      break;
+    }
+
+    submessage = gst_buffer_copy_region (buffer, GST_BUFFER_COPY_FLAGS |
+        GST_BUFFER_COPY_META | GST_BUFFER_COPY_MEMORY,
+        pos + GST_RTMP_FLV_TAG_HEADER_SIZE, header.payload_size);
+
+    GST_BUFFER_DTS (submessage) = GST_BUFFER_DTS (buffer);
+    GST_BUFFER_OFFSET (submessage) = GST_BUFFER_OFFSET (buffer) + pos;
+    GST_BUFFER_OFFSET_END (submessage) =
+        GST_BUFFER_OFFSET (submessage) + header.total_size;
+
+    submeta = gst_buffer_get_rtmp_meta (submessage);
+    g_assert (submeta);
+
+    submeta->type = header.type;
+    submeta->size = header.payload_size;
+
+    if (pos == 0) {
+      first_ts = header.timestamp;
+    } else {
+      guint32 ts_offset = header.timestamp - first_ts;
+
+      submeta->ts_delta += ts_offset;
+      GST_BUFFER_DTS (submessage) += ts_offset * GST_MSECOND;
+      GST_BUFFER_FLAG_UNSET (submessage, GST_BUFFER_FLAG_DISCONT);
+    }
+
+    gst_rtmp_buffer_dump (submessage, "<<< submessage");
+    gst_rtmp_connection_handle_message (connection, submessage);
+    gst_buffer_unref (submessage);
+
+    pos += header.total_size;
+  }
+
+  gst_buffer_unmap (buffer, &map);
+}
+
+static void
 gst_rtmp_connection_handle_protocol_control (GstRtmpConnection * connection,
     GstBuffer * buffer)
 {