tsmux: Ability for streams to disappear and reappear
authorVivia Nikolaidou <vivia@ahiru.eu>
Fri, 10 Apr 2020 16:54:31 +0000 (19:54 +0300)
committerGStreamer Merge Bot <gitlab-merge-bot@gstreamer-foundation.org>
Wed, 15 Apr 2020 09:07:24 +0000 (09:07 +0000)
Until now, any streams in tsmux had to be present when the element
started its first buffer. Now they can appear at any point during the
stream, or even disappear and reappear later using the same PID.

gst/mpegtsmux/gstbasetsmux.c
gst/mpegtsmux/tsmux/tsmux.c
gst/mpegtsmux/tsmux/tsmux.h
gst/mpegtsmux/tsmux/tsmuxstream.c
gst/mpegtsmux/tsmux/tsmuxstream.h
tests/check/elements/mpegtsmux.c

index ca551fb..887b251 100644 (file)
@@ -688,89 +688,78 @@ not_negotiated:
 }
 
 static GstFlowReturn
-gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
+gst_base_ts_mux_create_pad_stream (GstBaseTsMux * mux, GstPad * pad)
 {
+  GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
+  gchar *name = NULL;
+  gchar *pcr_name;
   GstFlowReturn ret = GST_FLOW_OK;
-  GList *walk = GST_ELEMENT (mux)->sinkpads;
 
-  /* Create the streams */
-  while (walk) {
-    GstPad *pad = GST_PAD (walk->data);
-    GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (walk->data);
-    gchar *name = NULL;
-    gchar *pcr_name;
-
-    walk = g_list_next (walk);
-
-    if (ts_pad->prog_id == -1) {
-      name = GST_PAD_NAME (pad);
-      if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map,
-              name)) {
-        gint idx;
-        gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
-        if (!ret) {
-          GST_ELEMENT_ERROR (mux, STREAM, MUX,
-              ("Reading program map failed. Assuming default"), (NULL));
-          idx = DEFAULT_PROG_ID;
-        }
-        if (idx < 0) {
-          GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
-              "than zero; DEFAULT_PROGRAM = %d is used instead",
-              idx, name, DEFAULT_PROG_ID);
-          idx = DEFAULT_PROG_ID;
-        }
-        ts_pad->prog_id = idx;
-      } else {
-        ts_pad->prog_id = DEFAULT_PROG_ID;
+  if (ts_pad->prog_id == -1) {
+    name = GST_PAD_NAME (pad);
+    if (mux->prog_map != NULL && gst_structure_has_field (mux->prog_map, name)) {
+      gint idx;
+      gboolean ret = gst_structure_get_int (mux->prog_map, name, &idx);
+      if (!ret) {
+        GST_ELEMENT_ERROR (mux, STREAM, MUX,
+            ("Reading program map failed. Assuming default"), (NULL));
+        idx = DEFAULT_PROG_ID;
+      }
+      if (idx < 0) {
+        GST_DEBUG_OBJECT (mux, "Program number %d associate with pad %s less "
+            "than zero; DEFAULT_PROGRAM = %d is used instead",
+            idx, name, DEFAULT_PROG_ID);
+        idx = DEFAULT_PROG_ID;
       }
+      ts_pad->prog_id = idx;
+    } else {
+      ts_pad->prog_id = DEFAULT_PROG_ID;
     }
+  }
 
-    ts_pad->prog =
-        (TsMuxProgram *) g_hash_table_lookup (mux->programs,
-        GINT_TO_POINTER (ts_pad->prog_id));
-    if (ts_pad->prog == NULL) {
-      ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
-      if (ts_pad->prog == NULL)
-        goto no_program;
-      tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
-      tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
-      tsmux_program_set_scte35_interval (ts_pad->prog,
-          mux->scte35_null_interval);
-      g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
-          ts_pad->prog);
-    }
+  ts_pad->prog =
+      (TsMuxProgram *) g_hash_table_lookup (mux->programs,
+      GINT_TO_POINTER (ts_pad->prog_id));
+  if (ts_pad->prog == NULL) {
+    ts_pad->prog = tsmux_program_new (mux->tsmux, ts_pad->prog_id);
+    if (ts_pad->prog == NULL)
+      goto no_program;
+    tsmux_set_pmt_interval (ts_pad->prog, mux->pmt_interval);
+    tsmux_program_set_scte35_pid (ts_pad->prog, mux->scte35_pid);
+    tsmux_program_set_scte35_interval (ts_pad->prog, mux->scte35_null_interval);
+    g_hash_table_insert (mux->programs, GINT_TO_POINTER (ts_pad->prog_id),
+        ts_pad->prog);
+  }
 
-    if (ts_pad->stream == NULL) {
-      ret = gst_base_ts_mux_create_stream (mux, ts_pad);
-      if (ret != GST_FLOW_OK)
-        goto no_stream;
-    }
+  if (ts_pad->stream == NULL) {
+    ret = gst_base_ts_mux_create_stream (mux, ts_pad);
+    if (ret != GST_FLOW_OK)
+      goto no_stream;
+  }
 
-    if (ts_pad->prog->pcr_stream == NULL) {
-      /* Take the first stream of the program for the PCR */
-      GST_DEBUG_OBJECT (ts_pad,
-          "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
-          ts_pad->pid, ts_pad->prog_id);
+  if (ts_pad->prog->pcr_stream == NULL) {
+    /* Take the first stream of the program for the PCR */
+    GST_DEBUG_OBJECT (ts_pad,
+        "Use stream (pid=%d) from pad as PCR for program (prog_id = %d)",
+        ts_pad->pid, ts_pad->prog_id);
 
-      tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
-    }
+    tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
+  }
 
-    /* Check for user-specified PCR PID */
-    pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
-    if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) {
-      const gchar *sink_name =
-          gst_structure_get_string (mux->prog_map, pcr_name);
+  /* Check for user-specified PCR PID */
+  pcr_name = g_strdup_printf ("PCR_%d", ts_pad->prog->pgm_number);
+  if (mux->prog_map && gst_structure_has_field (mux->prog_map, pcr_name)) {
+    const gchar *sink_name = gst_structure_get_string (mux->prog_map, pcr_name);
 
-      if (!g_strcmp0 (name, sink_name)) {
-        GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
-            "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
-        tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
-      }
+    if (!g_strcmp0 (name, sink_name)) {
+      GST_DEBUG_OBJECT (mux, "User specified stream (pid=%d) as PCR for "
+          "program (prog_id = %d)", ts_pad->pid, ts_pad->prog->pgm_number);
+      tsmux_program_set_pcr_stream (ts_pad->prog, ts_pad->stream);
     }
-    g_free (pcr_name);
   }
+  g_free (pcr_name);
 
-  return GST_FLOW_OK;
+  return ret;
 
   /* ERRORS */
 no_program:
@@ -787,6 +776,25 @@ no_stream:
   }
 }
 
+static GstFlowReturn
+gst_base_ts_mux_create_streams (GstBaseTsMux * mux)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+  GList *walk = GST_ELEMENT (mux)->sinkpads;
+
+  /* Create the streams */
+  while (walk) {
+    GstPad *pad = GST_PAD (walk->data);
+
+    ret = gst_base_ts_mux_create_pad_stream (mux, pad);
+    if (ret != GST_FLOW_OK)
+      return ret;
+    walk = g_list_next (walk);
+  }
+
+  return GST_FLOW_OK;
+}
+
 static void
 new_packet_common_init (GstBaseTsMux * mux, GstBuffer * buf, guint8 * data,
     guint len)
@@ -1078,8 +1086,22 @@ gst_base_ts_mux_aggregate_buffer (GstBaseTsMux * mux,
   }
 
   prog = best->prog;
-  if (prog == NULL)
-    goto no_program;
+  if (prog == NULL) {
+    GList *cur;
+
+    gst_base_ts_mux_create_pad_stream (mux, GST_PAD (best));
+    tsmux_resend_pat (mux->tsmux);
+    tsmux_resend_si (mux->tsmux);
+    prog = best->prog;
+    g_assert_nonnull (prog);
+
+    /* output PMT for each program */
+    for (cur = mux->tsmux->programs; cur; cur = cur->next) {
+      TsMuxProgram *program = (TsMuxProgram *) cur->data;
+
+      tsmux_resend_pmt (program);
+    }
+  }
 
   g_assert (buf != NULL);
 
@@ -1213,15 +1235,6 @@ write_fail:
   {
     return mux->last_flow_ret;
   }
-no_program:
-  {
-    if (buf)
-      gst_buffer_unref (buf);
-    GST_ELEMENT_ERROR (mux, STREAM, MUX,
-        ("Stream on pad %" GST_PTR_FORMAT
-            " is not associated with any program", best), (NULL));
-    return GST_FLOW_ERROR;
-  }
 }
 
 /* GstElement implementation */
@@ -1259,6 +1272,37 @@ stream_exists:
   }
 }
 
+static void
+gst_base_ts_mux_release_pad (GstElement * element, GstPad * pad)
+{
+  GstBaseTsMux *mux = GST_BASE_TS_MUX (element);
+
+  if (mux->tsmux) {
+    GList *cur;
+    GstBaseTsMuxPad *ts_pad = GST_BASE_TS_MUX_PAD (pad);
+    gint pid = ts_pad->pid;
+
+    if (ts_pad->prog->pcr_stream == ts_pad->stream) {
+      tsmux_stream_pcr_unref (ts_pad->prog->pcr_stream);
+      ts_pad->prog->pcr_stream = NULL;
+    }
+    if (tsmux_remove_stream (mux->tsmux, pid, ts_pad->prog)) {
+      g_hash_table_remove (mux->programs, GINT_TO_POINTER (ts_pad->prog_id));
+    }
+    tsmux_resend_pat (mux->tsmux);
+    tsmux_resend_si (mux->tsmux);
+
+    /* output PMT for each program */
+    for (cur = mux->tsmux->programs; cur; cur = cur->next) {
+      TsMuxProgram *program = (TsMuxProgram *) cur->data;
+
+      tsmux_resend_pmt (program);
+    }
+  }
+
+  gst_element_remove_pad (element, pad);
+}
+
 static gboolean
 gst_base_ts_mux_send_event (GstElement * element, GstEvent * event)
 {
@@ -1863,6 +1907,7 @@ gst_base_ts_mux_class_init (GstBaseTsMuxClass * klass)
   gobject_class->constructed = gst_base_ts_mux_constructed;
 
   gstelement_class->request_new_pad = gst_base_ts_mux_request_new_pad;
+  gstelement_class->release_pad = gst_base_ts_mux_release_pad;
   gstelement_class->send_event = gst_base_ts_mux_send_event;
 
   gstagg_class->update_src_caps = gst_base_ts_mux_update_src_caps;
index 92625b9..4c1ace9 100644 (file)
@@ -467,6 +467,25 @@ tsmux_program_new (TsMux * mux, gint prog_id)
   return program;
 }
 
+gboolean
+tsmux_program_delete (TsMux * mux, TsMuxProgram * program)
+{
+  g_return_val_if_fail (mux != NULL, FALSE);
+
+  if (mux->nb_programs == 0)
+    return FALSE;
+
+  if (!program)
+    return FALSE;
+
+  mux->programs = g_list_remove (mux->programs, program);
+  mux->nb_programs--;
+  mux->pat_changed = TRUE;
+  tsmux_program_free ((TsMuxProgram *) program);
+
+  return TRUE;
+}
+
 /**
  * tsmux_set_pmt_interval:
  * @program: a #TsMuxProgram
@@ -594,6 +613,8 @@ tsmux_program_add_stream (TsMuxProgram * program, TsMuxStream * stream)
   g_return_if_fail (program != NULL);
   g_return_if_fail (stream != NULL);
 
+  stream->program_array_index = program->streams->len;
+
   g_array_append_val (program->streams, stream);
   program->pmt_changed = TRUE;
 }
@@ -720,6 +741,35 @@ tsmux_find_stream (TsMux * mux, guint16 pid)
   return found;
 }
 
+gboolean
+tsmux_remove_stream (TsMux * mux, guint16 pid, TsMuxProgram * program)
+{
+  GList *cur;
+  gboolean ret = FALSE;
+
+  g_return_val_if_fail (mux != NULL, FALSE);
+
+  for (cur = mux->streams; cur; cur = cur->next) {
+    TsMuxStream *stream = (TsMuxStream *) cur->data;
+
+    if (tsmux_stream_get_pid (stream) == pid) {
+      if (program->streams->len == 1) {
+        tsmux_program_delete (mux, program);
+        ret = TRUE;
+      } else {
+        program->streams =
+            g_array_remove_index (program->streams,
+            stream->program_array_index);
+      }
+
+      mux->streams = g_list_remove (mux->streams, stream);
+      tsmux_stream_free (stream);
+      return ret;
+    }
+  }
+  return ret;
+}
+
 static gboolean
 tsmux_get_buffer (TsMux * mux, GstBuffer ** buf)
 {
index a7e62a5..8dbb9da 100644 (file)
@@ -225,6 +225,7 @@ void                tsmux_resend_pmt                (TsMuxProgram *program);
 void            tsmux_program_set_scte35_pid    (TsMuxProgram *program, guint16 pid);
 guint16         tsmux_program_get_scte35_pid    (TsMuxProgram *program);
 void            tsmux_program_set_scte35_interval (TsMuxProgram *mux, guint interval);
+gboolean        tsmux_program_delete            (TsMux *mux, TsMuxProgram *program);
 
 
 /* SI table management */
@@ -239,6 +240,7 @@ gboolean        tsmux_send_section              (TsMux *mux, GstMpegtsSection *s
 /* stream management */
 TsMuxStream *  tsmux_create_stream             (TsMux *mux, guint stream_type, guint16 pid, gchar *language);
 TsMuxStream *  tsmux_find_stream               (TsMux *mux, guint16 pid);
+gboolean        tsmux_remove_stream             (TsMux *mux, guint16 pid, TsMuxProgram *program);
 
 void           tsmux_program_add_stream        (TsMuxProgram *program, TsMuxStream *stream);
 void           tsmux_program_set_pcr_stream    (TsMuxProgram *program, TsMuxStream *stream);
index 5eb6abf..beb93bd 100644 (file)
@@ -133,6 +133,7 @@ tsmux_stream_new (guint16 pid, guint stream_type)
   stream->pes_payload_size = 0;
   stream->cur_pes_payload_size = 0;
   stream->pes_bytes_written = 0;
+  stream->program_array_index = -1;
 
   switch (stream_type) {
     case TSMUX_ST_VIDEO_MPEG1:
index ce8c512..93f0467 100644 (file)
@@ -166,6 +166,8 @@ struct TsMuxStream {
   guint8 id;
   /* extended stream id (13818-1 Amdt 2) */
   guint8 id_extended;
+  /* array index in program array */
+  gint program_array_index;
 
   gboolean is_video_stream;
 
index 2c8a495..3695434 100644 (file)
@@ -148,30 +148,18 @@ cleanup_tsmux (GstElement * mux, const gchar * sinkname)
 }
 
 static void
-check_tsmux_pad (GstStaticPadTemplate * srctemplate,
+check_tsmux_pad_given_muxer (GstElement * mux,
     const gchar * src_caps_string, gint pes_id, gint pmt_id,
-    const gchar * sinkname, CheckOutputBuffersFunc check_func, guint n_bufs,
-    gssize input_buf_size, guint alignment)
+    CheckOutputBuffersFunc check_func, guint n_bufs, gssize input_buf_size)
 {
   GstClockTime ts;
-  GstElement *mux;
   GstBuffer *inbuffer, *outbuffer;
   GstCaps *caps;
   gint num_buffers;
   gint i;
   gint pmt_pid = -1, el_pid = -1, pcr_pid = -1, packets = 0;
-  gchar *padname;
   GstQuery *drain;
 
-  mux = setup_tsmux (srctemplate, sinkname, &padname);
-
-  if (alignment != 0)
-    g_object_set (mux, "alignment", alignment, NULL);
-
-  fail_unless (gst_element_set_state (mux,
-          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
-      "could not set to playing");
-
   caps = gst_caps_from_string (src_caps_string);
   gst_check_setup_events (mysrcpad, mux, caps, GST_FORMAT_TIME);
   gst_caps_unref (caps);
@@ -348,11 +336,68 @@ check_tsmux_pad (GstStaticPadTemplate * srctemplate,
 
   g_list_free (buffers);
   buffers = NULL;
+}
+
+static void
+check_tsmux_pad (GstStaticPadTemplate * srctemplate,
+    const gchar * src_caps_string, gint pes_id, gint pmt_id,
+    const gchar * sinkname, CheckOutputBuffersFunc check_func, guint n_bufs,
+    gssize input_buf_size, guint alignment)
+{
+  gchar *padname;
+  GstElement *mux;
+
+  mux = setup_tsmux (srctemplate, sinkname, &padname);
+
+  if (alignment != 0)
+    g_object_set (mux, "alignment", alignment, NULL);
+
+  fail_unless (gst_element_set_state (mux,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
+      "could not set to playing");
+
+  check_tsmux_pad_given_muxer (mux, src_caps_string, pes_id, pmt_id,
+      check_func, n_bufs, input_buf_size);
 
   cleanup_tsmux (mux, padname);
   g_free (padname);
 }
 
+GST_START_TEST (test_reappearing_pad)
+{
+  gchar *padname;
+  GstElement *mux;
+  GstPad *pad;
+
+  mux = gst_check_setup_element ("mpegtsmux");
+  mysrcpad = setup_src_pad (mux, &video_src_template, "sink_%d", &padname);
+  mysinkpad = gst_check_setup_sink_pad (mux, &sink_template);
+  gst_pad_set_active (mysrcpad, TRUE);
+  gst_pad_set_active (mysinkpad, TRUE);
+
+  fail_unless (gst_element_set_state (mux,
+          GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
+      "could not set to playing");
+
+  check_tsmux_pad_given_muxer (mux, VIDEO_CAPS_STRING, 0xE0, 0x1b, NULL, 1, 1);
+
+  pad = gst_element_get_static_pad (mux, padname);
+  gst_pad_set_active (mysrcpad, FALSE);
+  gst_object_unref (pad);
+  teardown_src_pad (mux, padname);
+  gst_element_release_request_pad (mux, pad);
+  g_free (padname);
+
+  mysrcpad = setup_src_pad (mux, &video_src_template, "sink_%d", &padname);
+  gst_pad_set_active (mysrcpad, TRUE);
+
+  check_tsmux_pad_given_muxer (mux, VIDEO_CAPS_STRING, 0xE0, 0x1b, NULL, 1, 1);
+
+  cleanup_tsmux (mux, padname);
+  g_free (padname);
+}
+
+GST_END_TEST;
 
 GST_START_TEST (test_video)
 {
@@ -491,6 +536,7 @@ mpegtsmux_suite (void)
   tcase_add_test (tc_chain, test_multiple_state_change);
   tcase_add_test (tc_chain, test_align);
   tcase_add_test (tc_chain, test_keyframe_flag_propagation);
+  tcase_add_test (tc_chain, test_reappearing_pad);
 
   return s;
 }