mpegtsparse: Added alignment property
authorVivia Nikolaidou <vivia@ahiru.eu>
Wed, 22 Jan 2020 11:55:58 +0000 (13:55 +0200)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 29 Jan 2020 20:39:44 +0000 (20:39 +0000)
alignment works like in mpegtsmux, joining several MpegTS packets into
one buffer. Default value of 0 joins as many as possible for each
incoming buffer, to optimise CPU usage.

gst/mpegtsdemux/mpegtsparse.c
gst/mpegtsdemux/mpegtsparse.h

index fe75812..360bcea 100644 (file)
@@ -39,6 +39,7 @@
 
 #define TABLE_ID_UNSET 0xFF
 #define RUNNING_STATUS_RUNNING 4
+#define SYNC_BYTE 0x47
 
 GST_DEBUG_CATEGORY_STATIC (mpegts_parse_debug);
 #define GST_CAT_DEFAULT mpegts_parse_debug
@@ -64,6 +65,8 @@ struct _MpegTSParsePad
 
   /* the return of the latest push */
   GstFlowReturn flow_return;
+
+  MpegTSParse2Adapter ts_adapter;
 };
 
 static GstStaticPadTemplate src_template =
@@ -84,6 +87,7 @@ enum
   PROP_SET_TIMESTAMPS,
   PROP_SMOOTHING_LATENCY,
   PROP_PCR_PID,
+  PROP_ALIGNMENT,
   /* FILL ME */
 };
 
@@ -121,6 +125,8 @@ static gboolean push_event (MpegTSBase * base, GstEvent * event);
 #define mpegts_parse_parent_class parent_class
 G_DEFINE_TYPE (MpegTSParse2, mpegts_parse, GST_TYPE_MPEGTS_BASE);
 static void mpegts_parse_reset (MpegTSBase * base);
+static GstFlowReturn mpegts_parse_input_done (MpegTSBase * base,
+    GstBuffer * buffer);
 static GstFlowReturn
 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all);
 
@@ -135,6 +141,17 @@ mpegts_parse_dispose (GObject * object)
 }
 
 static void
+mpegts_parse_finalize (GObject * object)
+{
+  MpegTSParse2 *parse = (MpegTSParse2 *) object;
+
+  gst_adapter_clear (parse->ts_adapter.adapter);
+  g_object_unref (parse->ts_adapter.adapter);
+
+  GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
+}
+
+static void
 mpegts_parse_class_init (MpegTSParse2Class * klass)
 {
   GObjectClass *gobject_class = (GObjectClass *) (klass);
@@ -144,6 +161,7 @@ mpegts_parse_class_init (MpegTSParse2Class * klass)
   gobject_class->set_property = mpegts_parse_set_property;
   gobject_class->get_property = mpegts_parse_get_property;
   gobject_class->dispose = mpegts_parse_dispose;
+  gobject_class->finalize = mpegts_parse_finalize;
 
   g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS,
       g_param_spec_boolean ("set-timestamps",
@@ -159,6 +177,10 @@ mpegts_parse_class_init (MpegTSParse2Class * klass)
       g_param_spec_int ("pcr-pid", "PID containing PCR",
           "Set the PID to use for PCR values (-1 for auto)",
           -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_ALIGNMENT,
+      g_param_spec_uint ("alignment", "Alignment",
+          "Number of packets per buffer (padded with dummy packets on EOS) (0 = auto)",
+          0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   element_class = GST_ELEMENT_CLASS (klass);
   element_class->pad_removed = mpegts_parse_pad_removed;
@@ -180,6 +202,7 @@ mpegts_parse_class_init (MpegTSParse2Class * klass)
   ts_class->program_started = GST_DEBUG_FUNCPTR (mpegts_parse_program_started);
   ts_class->program_stopped = GST_DEBUG_FUNCPTR (mpegts_parse_program_stopped);
   ts_class->reset = GST_DEBUG_FUNCPTR (mpegts_parse_reset);
+  ts_class->input_done = GST_DEBUG_FUNCPTR (mpegts_parse_input_done);
   ts_class->inspect_packet = GST_DEBUG_FUNCPTR (mpegts_parse_inspect_packet);
 }
 
@@ -205,6 +228,12 @@ mpegts_parse_init (MpegTSParse2 * parse)
 
   parse->have_group_id = FALSE;
   parse->group_id = G_MAXUINT;
+
+  parse->ts_adapter.adapter = gst_adapter_new ();
+  parse->ts_adapter.packets_in_adapter = 0;
+  parse->alignment = 0;
+  parse->is_eos = FALSE;
+  parse->header = 0;
 }
 
 static void
@@ -248,6 +277,11 @@ mpegts_parse_reset (MpegTSBase * base)
   parse->bytes_since_pcr = 0;
   parse->pcr_pid = parse->user_pcr_pid;
   parse->ts_offset = 0;
+
+  gst_adapter_clear (parse->ts_adapter.adapter);
+  parse->ts_adapter.packets_in_adapter = 0;
+  parse->is_eos = FALSE;
+  parse->header = 0;
 }
 
 static void
@@ -268,6 +302,9 @@ mpegts_parse_set_property (GObject * object, guint prop_id,
     case PROP_PCR_PID:
       parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value);
       break;
+    case PROP_ALIGNMENT:
+      parse->alignment = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   }
@@ -289,6 +326,9 @@ mpegts_parse_get_property (GObject * object, guint prop_id,
     case PROP_PCR_PID:
       g_value_set_int (value, parse->pcr_pid);
       break;
+    case PROP_ALIGNMENT:
+      g_value_set_uint (value, parse->alignment);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
   }
@@ -371,8 +411,52 @@ push_event (MpegTSBase * base, GstEvent * event)
     }
     prepare_src_pad (base, parse);
   }
-  if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS))
+  if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS)) {
+    parse->is_eos = TRUE;
+
+    if (parse->alignment > 0 && parse->ts_adapter.packets_in_adapter > 0
+        && parse->ts_adapter.packets_in_adapter < parse->alignment) {
+      GstBuffer *buf;
+      GstMapInfo map;
+      guint8 *data;
+      gint missing_packets =
+          parse->alignment - parse->ts_adapter.packets_in_adapter;
+      gint i = missing_packets;
+      gsize packet_size = base->packetizer->packet_size;
+
+      GST_DEBUG_OBJECT (parse, "Adding %d dummy packets", missing_packets);
+
+      buf = gst_buffer_new_and_alloc (missing_packets * packet_size);
+      gst_buffer_map (buf, &map, GST_MAP_READWRITE);
+      data = map.data;
+
+      g_assert (packet_size > 0);
+
+      for (; i > 0; i--) {
+        gint offset;
+
+        if (packet_size > MPEGTS_NORMAL_PACKETSIZE) {
+          parse->header++;
+          GST_WRITE_UINT32_BE (data, parse->header);
+          offset = 4;
+        } else {
+          offset = 0;
+        }
+        GST_WRITE_UINT8 (data + offset, SYNC_BYTE);
+        /* null packet PID */
+        GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
+        /* no adaptation field exists | continuity counter undefined */
+        GST_WRITE_UINT8 (data + offset + 3, 0x10);
+        /* payload */
+        memset (data + offset + 4, 0, MPEGTS_NORMAL_PACKETSIZE - 4);
+        data += packet_size;
+      }
+      gst_buffer_unmap (buf, &map);
+      gst_adapter_push (parse->ts_adapter.adapter, buf);
+      parse->ts_adapter.packets_in_adapter += missing_packets;
+    }
     drain_pending_buffers (parse, TRUE);
+  }
 
   if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
     parse->ts_offset = 0;
@@ -407,6 +491,8 @@ mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name)
   tspad->program = NULL;
   tspad->pushed = FALSE;
   tspad->flow_return = GST_FLOW_NOT_LINKED;
+  tspad->ts_adapter.adapter = gst_adapter_new ();
+  tspad->ts_adapter.packets_in_adapter = 0;
   gst_pad_set_element_private (pad, tspad);
   gst_flow_combiner_add_pad (parse->flowcombiner, pad);
 
@@ -416,6 +502,9 @@ mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name)
 static void
 mpegts_parse_destroy_tspad (MpegTSParse2 * parse, MpegTSParsePad * tspad)
 {
+  gst_adapter_clear (tspad->ts_adapter.adapter);
+  g_object_unref (tspad->ts_adapter.adapter);
+
   /* free the wrapper */
   g_free (tspad);
 }
@@ -528,6 +617,53 @@ mpegts_parse_release_pad (GstElement * element, GstPad * pad)
 }
 
 static GstFlowReturn
+empty_adapter_into_pad (MpegTSParse2Adapter * ts_adapter, GstPad * pad)
+{
+  GstAdapter *adapter = ts_adapter->adapter;
+  GstBuffer *buf = NULL;
+  GstClockTime ts = gst_adapter_prev_pts (adapter, NULL);
+  gsize avail = gst_adapter_available (adapter);
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  if (avail > 0)
+    buf = gst_adapter_take_buffer (adapter, avail);
+
+  ts_adapter->packets_in_adapter = 0;
+
+  if (buf) {
+    GST_BUFFER_PTS (buf) = ts;
+    ret = gst_pad_push (pad, buf);
+  }
+
+  return ret;
+}
+
+static GstFlowReturn
+enqueue_and_maybe_push_buffer (MpegTSParse2 * parse, GstPad * pad,
+    MpegTSParse2Adapter * ts_adapter, GstBuffer * buffer)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  if (buffer != NULL) {
+    if (parse->alignment == 1) {
+      ret = gst_pad_push (pad, buffer);
+      ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
+    } else {
+      gst_adapter_push (ts_adapter->adapter, buffer);
+      ts_adapter->packets_in_adapter++;
+
+      if (ts_adapter->packets_in_adapter == parse->alignment
+          && ts_adapter->packets_in_adapter > 0) {
+        ret = empty_adapter_into_pad (ts_adapter, pad);
+        ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
+      }
+    }
+  }
+
+  return ret;
+}
+
+static GstFlowReturn
 mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad,
     GstMpegtsSection * section, MpegTSPacketizerPacket * packet)
 {
@@ -557,8 +693,9 @@ mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad,
 
   if (to_push) {
     GstBuffer *buf = mpegts_packet_to_buffer (packet);
-    ret = gst_pad_push (tspad->pad, buf);
-    ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
+    ret =
+        enqueue_and_maybe_push_buffer (parse, tspad->pad, &tspad->ts_adapter,
+        buf);
   }
 
   GST_LOG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
@@ -635,9 +772,6 @@ mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
   GST_OBJECT_UNLOCK (parse);
 
   buf = mpegts_packet_to_buffer (packet);
-  if (!(packet->afc_flags & MPEGTS_AFC_RANDOM_ACCESS_FLAG)) {
-    gst_buffer_set_flags (buf, GST_BUFFER_FLAG_DELTA_UNIT);
-  }
   ret = mpegts_parse_have_buffer (base, buf);
 
   while (pad && !done) {
@@ -843,10 +977,12 @@ drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
     GST_BUFFER_PTS (buffer) = out_ts + parse->ts_offset;
     GST_BUFFER_DTS (buffer) = out_ts + parse->ts_offset;
     if (ret == GST_FLOW_OK) {
-      ret = gst_pad_push (parse->srcpad, buffer);
-      ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
-    } else
+      ret =
+          enqueue_and_maybe_push_buffer (parse, parse->srcpad,
+          &parse->ts_adapter, buffer);
+    } else {
       gst_buffer_unref (buffer);
+    }
 
     /* Free this list node and move to the next */
     p = g_list_previous (l);
@@ -854,6 +990,10 @@ drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
     l = p;
   }
 
+  if (parse->is_eos) {
+    empty_adapter_into_pad (&parse->ts_adapter, parse->srcpad);
+  }
+
   parse->pending_buffers = end;
   parse->bytes_since_pcr = bytes_since_pcr;
   parse->previous_pcr = pcr;
@@ -868,6 +1008,19 @@ mpegts_parse_have_buffer (MpegTSBase * base, GstBuffer * buffer)
 
   GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer);
 
+  /* Assume all packets have equal size */
+  if (parse->alignment > 0 &&
+      base->packetizer->packet_size != MPEGTS_NORMAL_PACKETSIZE) {
+    GstMapInfo map;
+    guint8 *data;
+
+    gst_buffer_map (buffer, &map, GST_MAP_READ);
+    data = map.data;
+
+    parse->header = GST_READ_UINT32_BE (data);
+    gst_buffer_unmap (buffer, &map);
+  }
+
   if (parse->current_pcr != GST_CLOCK_TIME_NONE) {
     GST_DEBUG_OBJECT (parse,
         "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT,
@@ -895,11 +1048,37 @@ mpegts_parse_have_buffer (MpegTSBase * base, GstBuffer * buffer)
     }
   }
 
-  if (buffer != NULL) {
-    ret = gst_pad_push (parse->srcpad, buffer);
+  ret =
+      enqueue_and_maybe_push_buffer (parse, parse->srcpad, &parse->ts_adapter,
+      buffer);
+
+  return ret;
+}
+
+static void
+empty_pad (GstPad * pad, MpegTSParse2 * parse)
+{
+  MpegTSParsePad *tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
+  GstFlowReturn ret;
+
+  ret = empty_adapter_into_pad (&tspad->ts_adapter, tspad->pad);
+  ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
+}
+
+static GstFlowReturn
+mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer)
+{
+  MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
+  GstFlowReturn ret = GST_FLOW_OK;
+
+  if (!prepare_src_pad (base, parse))
+    return GST_FLOW_OK;
+
+  if (parse->alignment == 0) {
+    ret = empty_adapter_into_pad (&parse->ts_adapter, parse->srcpad);
     ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
+    g_list_foreach (parse->srcpads, (GFunc) empty_pad, parse);
   }
-
   return ret;
 }
 
index f89bd4c..dfd221e 100644 (file)
@@ -46,6 +46,11 @@ G_BEGIN_DECLS
 typedef struct _MpegTSParse2 MpegTSParse2;
 typedef struct _MpegTSParse2Class MpegTSParse2Class;
 
+typedef struct _MpegTSParse2Adapter {
+  GstAdapter *adapter;
+  guint packets_in_adapter;
+} MpegTSParse2Adapter;
+
 struct _MpegTSParse2 {
   MpegTSBase parent;
 
@@ -66,7 +71,7 @@ struct _MpegTSParse2 {
   GList *srcpads;
 
   GstFlowCombiner *flowcombiner;
-  
+
   /* state */
   gboolean first;
   gboolean set_timestamps;
@@ -75,6 +80,12 @@ struct _MpegTSParse2 {
   GList *pending_buffers;
   GstClockTime previous_pcr;
   guint bytes_since_pcr;
+
+  /* Combine several packets into a larger buffer */
+  MpegTSParse2Adapter ts_adapter;
+  guint alignment;
+  gboolean is_eos;
+  guint32 header;
 };
 
 struct _MpegTSParse2Class {