irtspparse: handle multiple and incomplete frames
authorOleksandrKvl <oleksandrdvl@gmail.com>
Tue, 2 Jul 2019 11:30:35 +0000 (14:30 +0300)
committerSebastian Dröge <slomo@coaxion.net>
Tue, 2 Jul 2019 13:23:27 +0000 (13:23 +0000)
Interleaved frames can be fragmented between
incoming frames. Thus, we can have multiple
frames within the single input frame, as well as
incomplete frame. Now it preserves parsing
state and handle both situations.

Fixes #991

gst/pcapparse/gstirtspparse.c
gst/pcapparse/gstirtspparse.h

index a1d5989..4d64e36 100644 (file)
@@ -117,12 +117,15 @@ gst_irtsp_parse_class_init (GstIRTSPParseClass * klass)
 static void
 gst_irtsp_parse_reset (GstIRTSPParse * IRTSPParse)
 {
+  IRTSPParse->state = IRTSP_SEARCH_FRAME;
+  IRTSPParse->current_offset = 0;
+  IRTSPParse->discont = FALSE;
 }
 
 static void
 gst_irtsp_parse_init (GstIRTSPParse * IRTSPParse)
 {
-  gst_base_parse_set_min_frame_size (GST_BASE_PARSE (IRTSPParse), 4);
+  gst_base_parse_set_min_frame_size (GST_BASE_PARSE (IRTSPParse), 1);
   gst_irtsp_parse_reset (IRTSPParse);
 }
 
@@ -152,64 +155,110 @@ gst_irtsp_parse_stop (GstBaseParse * parse)
   return TRUE;
 }
 
+static void
+gst_irtsp_set_caps_once (GstBaseParse * parse)
+{
+  if (!gst_pad_has_current_caps (GST_BASE_PARSE_SRC_PAD (parse))) {
+    GstCaps *caps = gst_caps_new_empty_simple ("application/x-rtp");
+    gst_pad_set_caps (GST_BASE_PARSE_SRC_PAD (parse), caps);
+    gst_caps_unref (caps);
+  }
+}
+
 static GstFlowReturn
 gst_irtsp_parse_handle_frame (GstBaseParse * parse,
     GstBaseParseFrame * frame, gint * skipsize)
 {
+  static const guint frame_header_size = sizeof (guint8) * 4;
+  static const guint8 frame_header_magic = 0x24;
   GstIRTSPParse *IRTSPParse = GST_IRTSP_PARSE (parse);
   GstBuffer *buf = frame->buffer;
-  GstByteReader reader;
-  gint off;
   GstMapInfo map;
-  guint framesize;
+  const guint8 *frame_start;
+  guint8 current_channel_id;
+  const guint8 *data;
+  guint data_size;
+  guint flushed_size;
+
+  if (G_UNLIKELY (GST_BUFFER_FLAG_IS_SET (frame->buffer,
+              GST_BUFFER_FLAG_DISCONT))) {
+    IRTSPParse->discont = TRUE;
+  }
 
   gst_buffer_map (buf, &map, GST_MAP_READ);
-  if (G_UNLIKELY (map.size < 4))
-    goto exit;
-
-  gst_byte_reader_init (&reader, map.data, map.size);
-
-  off = gst_byte_reader_masked_scan_uint32 (&reader, 0xffff0000,
-      0x24000000 + (IRTSPParse->channel_id << 16), 0, map.size);
-
-  GST_LOG_OBJECT (parse, "possible sync at buffer offset %d", off);
 
-  /* didn't find anything that looks like a sync word, skip */
-  if (off < 0) {
-    *skipsize = map.size - 3;
-    goto exit;
-  }
+start:
+  g_assert (map.size >= IRTSPParse->current_offset);
+  data = &map.data[IRTSPParse->current_offset];
+  data_size = map.size - IRTSPParse->current_offset;
+
+  switch (IRTSPParse->state) {
+    case IRTSP_SEARCH_FRAME:
+      /* Use the first occurence of 0x24 as a start of interleaved frames.
+       * This 'trick' allows us to parse a dump that doesn't contain RTSP
+       * handshake. It's up to user to provide the data where the first 0x24
+       * is an RTSP frame */
+      frame_start = memchr (data, frame_header_magic, data_size);
+      if (frame_start) {
+        IRTSPParse->state = IRTSP_PARSE_FRAME;
+        IRTSPParse->current_offset += frame_start - data;
+        goto start;
+      } else {
+        IRTSPParse->current_offset += data_size;
+      }
+      break;
+    case IRTSP_PARSE_FRAME:
+      if (data_size > 0 && data[0] != frame_header_magic) {
+        IRTSPParse->state = IRTSP_SEARCH_FRAME;
+        goto start;
+      }
+
+      if (data_size >= frame_header_size) {
+        IRTSPParse->current_offset += frame_header_size;
+        current_channel_id = data[1];
+        IRTSPParse->frame_size = GST_READ_UINT16_BE (&data[2]);
+        if (current_channel_id != IRTSPParse->target_channel_id) {
+          IRTSPParse->state = IRTSP_SKIP_FRAME;
+        } else {
+          IRTSPParse->state = IRTSP_FLUSH_FRAME;
+        }
+        goto start;
+      }
+      break;
+    case IRTSP_SKIP_FRAME:
+      if (data_size >= IRTSPParse->frame_size) {
+        IRTSPParse->current_offset += IRTSPParse->frame_size;
+        IRTSPParse->state = IRTSP_PARSE_FRAME;
+        goto start;
+      }
+      break;
+    case IRTSP_FLUSH_FRAME:
+      if (data_size >= IRTSPParse->frame_size) {
+        gst_irtsp_set_caps_once (parse);
+        gst_buffer_unmap (buf, &map);
 
-  /* possible frame header, but not at offset 0? skip bytes before sync */
-  if (off > 0) {
-    *skipsize = off;
-    goto exit;
-  }
+        frame->out_buffer = gst_buffer_copy (frame->buffer);
+        gst_buffer_resize (frame->out_buffer, IRTSPParse->current_offset,
+            IRTSPParse->frame_size);
 
-  framesize = GST_READ_UINT16_BE (map.data + 2) + 4;
-  GST_LOG_OBJECT (parse, "got frame size %d", framesize);
+        if (G_UNLIKELY (IRTSPParse->discont)) {
+          GST_BUFFER_FLAG_SET (frame->out_buffer, GST_BUFFER_FLAG_DISCONT);
+          IRTSPParse->discont = FALSE;
+        }
 
-  if (!gst_pad_has_current_caps (GST_BASE_PARSE_SRC_PAD (parse))) {
-    GstCaps *caps;
+        flushed_size = IRTSPParse->current_offset + IRTSPParse->frame_size;
+        IRTSPParse->current_offset = 0;
+        IRTSPParse->state = IRTSP_PARSE_FRAME;
 
-    caps = gst_caps_new_empty_simple ("application/x-rtp");
-    gst_pad_set_caps (GST_BASE_PARSE_SRC_PAD (parse), caps);
-    gst_caps_unref (caps);
-  }
+        return gst_base_parse_finish_frame (parse, frame, flushed_size);
+      }
 
-  if (framesize <= map.size) {
-    gst_buffer_unmap (buf, &map);
-    /* HACK HACK skip header.
-     * could also ask baseparse to skip this,
-     * but that would give us a discontinuity for free
-     * which is a bit too much to have on all our packets */
-    frame->out_buffer = gst_buffer_copy (frame->buffer);
-    gst_buffer_resize (frame->out_buffer, 4, -1);
-    GST_BUFFER_FLAG_UNSET (frame->out_buffer, GST_BUFFER_FLAG_DISCONT);
-    return gst_base_parse_finish_frame (parse, frame, framesize);
+      break;
+    default:
+      g_assert_not_reached ();
+      break;
   }
 
-exit:
   gst_buffer_unmap (buf, &map);
   return GST_FLOW_OK;
 }
@@ -222,7 +271,7 @@ gst_irtsp_parse_set_property (GObject * object,
 
   switch (prop_id) {
     case PROP_CHANNEL_ID:
-      IRTSPParse->channel_id = g_value_get_int (value);
+      IRTSPParse->target_channel_id = g_value_get_int (value);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -238,7 +287,7 @@ gst_irtsp_parse_get_property (GObject * object,
 
   switch (prop_id) {
     case PROP_CHANNEL_ID:
-      g_value_set_int (value, IRTSPParse->channel_id);
+      g_value_set_int (value, IRTSPParse->target_channel_id);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
index 27a8e2e..e3d3b74 100644 (file)
@@ -26,7 +26,6 @@
 #include <gst/base/gstbaseparse.h>
 
 G_BEGIN_DECLS
-
 #define GST_TYPE_IRTSP_PARSE \
   (gst_irtsp_parse_get_type())
 #define GST_IRTSP_PARSE(obj) \
@@ -37,20 +36,32 @@ G_BEGIN_DECLS
   (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_IRTSP_PARSE))
 #define GST_IS_IRTSP_PARSE_CLASS(klass) \
   (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_IRTSP_PARSE))
-
 typedef struct _GstIRTSPParse GstIRTSPParse;
 typedef struct _GstIRTSPParseClass GstIRTSPParseClass;
 
+typedef enum
+{
+  IRTSP_SEARCH_FRAME,
+  IRTSP_PARSE_FRAME,
+  IRTSP_FLUSH_FRAME,
+  IRTSP_SKIP_FRAME
+} RtspParserState;
+
 /**
  * GstIRTSPParse:
  *
  * The opaque GstIRTSPParse object
  */
-struct _GstIRTSPParse {
+struct _GstIRTSPParse
+{
   GstBaseParse baseparse;
 
-  guint8 channel_id;
-  /*< private >*/
+  guint8 target_channel_id;
+  /*< private > */
+  RtspParserState state;
+  guint16 frame_size;
+  guint current_offset;
+  gboolean discont;
 };
 
 /**
@@ -59,12 +70,12 @@ struct _GstIRTSPParse {
  *
  * The opaque GstIRTSPParseClass data structure.
  */
-struct _GstIRTSPParseClass {
+struct _GstIRTSPParseClass
+{
   GstBaseParseClass baseparse_class;
 };
 
 GType gst_irtsp_parse_get_type (void);
 
 G_END_DECLS
-
 #endif /* __GST_IRTSP_PARSE_H__ */