mpegtsbase/tsdemux: Fix stream/pad activation order
authorEdward Hervey <edward.hervey@collabora.co.uk>
Thu, 21 Jul 2011 11:26:55 +0000 (13:26 +0200)
committerEdward Hervey <edward.hervey@collabora.co.uk>
Mon, 25 Jul 2011 16:56:49 +0000 (18:56 +0200)
We first activate new streams before shutting down old ones.
We emit no-more-pads after we add new streams and emit EOS before
removing old ones.
Also cleanup/refactor a bit more of the code accordingly

gst/mpegtsdemux/mpegtsbase.c
gst/mpegtsdemux/mpegtsbase.h
gst/mpegtsdemux/tsdemux.c

index 46e06cb..ac4b615 100644 (file)
@@ -331,6 +331,35 @@ mpegts_get_descriptor_from_stream (MpegTSBaseStream * stream, guint8 tag)
   return retval;
 }
 
+typedef struct
+{
+  gboolean res;
+  guint16 pid;
+} PIDLookup;
+
+static void
+foreach_pid_in_program (gpointer key, MpegTSBaseProgram * program,
+    PIDLookup * lookup)
+{
+  if (!program->active)
+    return;
+  if (program->streams[lookup->pid])
+    lookup->res = TRUE;
+}
+
+static gboolean
+mpegts_pid_in_active_programs (MpegTSBase * base, guint16 pid)
+{
+  PIDLookup lookup;
+
+  lookup.res = FALSE;
+  lookup.pid = pid;
+  g_hash_table_foreach (base->programs, (GHFunc) foreach_pid_in_program,
+      &lookup);
+
+  return lookup.res;
+}
+
 /* returns NULL if no matching descriptor found *
  * otherwise returns a descriptor that needs to *
  * be freed */
@@ -363,8 +392,8 @@ mpegts_get_descriptor_from_program (MpegTSBaseProgram * program, guint8 tag)
   return retval;
 }
 
-MpegTSBaseProgram *
-mpegts_base_add_program (MpegTSBase * base,
+static MpegTSBaseProgram *
+mpegts_base_new_program (MpegTSBase * base,
     gint program_number, guint16 pmt_pid)
 {
   MpegTSBaseProgram *program;
@@ -379,6 +408,23 @@ mpegts_base_add_program (MpegTSBase * base,
   program->streams = g_new0 (MpegTSBaseStream *, 0x2000);
   program->patcount = 0;
 
+  return program;
+}
+
+MpegTSBaseProgram *
+mpegts_base_add_program (MpegTSBase * base,
+    gint program_number, guint16 pmt_pid)
+{
+  MpegTSBaseProgram *program;
+
+  GST_DEBUG_OBJECT (base, "program_number : %d, pmt_pid : %d",
+      program_number, pmt_pid);
+
+  program = mpegts_base_new_program (base, program_number, pmt_pid);
+
+  /* Mark the PMT PID as being a known PSI PID */
+  MPEGTS_BIT_SET (base->known_psi, pmt_pid);
+
   g_hash_table_insert (base->programs,
       GINT_TO_POINTER (program_number), program);
 
@@ -396,41 +442,20 @@ mpegts_base_get_program (MpegTSBase * base, gint program_number)
   return program;
 }
 
-#if 0
-static GstPad *
-mpegts_base_activate_program (MpegTSBase * base, MpegTSBaseProgram * program)
-{
-  MpegTSBasePad *tspad;
-  gchar *pad_name;
-
-  pad_name = g_strdup_printf ("program_%d", program->program_number);
-
-  tspad = mpegts_base_create_tspad (base, pad_name);
-  tspad->program_number = program->program_number;
-  tspad->program = program;
-  program->tspad = tspad;
-  g_free (pad_name);
-  gst_pad_set_active (tspad->pad, TRUE);
-  program->active = TRUE;
-
-  return tspad->pad;
-}
-
-static GstPad *
-mpegts_base_deactivate_program (MpegTSBase * base, MpegTSBaseProgram * program)
+static MpegTSBaseProgram *
+mpegts_base_steal_program (MpegTSBase * base, gint program_number)
 {
-  MpegTSBasePad *tspad;
+  MpegTSBaseProgram *program;
 
-  tspad = program->tspad;
-  gst_pad_set_active (tspad->pad, FALSE);
-  program->active = FALSE;
+  program = (MpegTSBaseProgram *) g_hash_table_lookup (base->programs,
+      GINT_TO_POINTER ((gint) program_number));
 
-  /* tspad will be destroyed in GstElementClass::pad_removed */
+  if (program)
+    g_hash_table_steal (base->programs,
+        GINT_TO_POINTER ((gint) program_number));
 
-  return tspad->pad;
+  return program;
 }
-#endif
-
 
 static void
 mpegts_base_free_program (MpegTSBaseProgram * program)
@@ -453,6 +478,8 @@ mpegts_base_free_program (MpegTSBaseProgram * program)
   g_free (program);
 }
 
+/* FIXME : This is being called by tsdemux::find_timestamps()
+ * We need to avoid re-entrant code like that */
 void
 mpegts_base_remove_program (MpegTSBase * base, gint program_number)
 {
@@ -482,6 +509,11 @@ mpegts_base_program_add_stream (MpegTSBase * base,
   GST_DEBUG ("pid:0x%04x, stream_type:0x%03x, stream_info:%" GST_PTR_FORMAT,
       pid, stream_type, stream_info);
 
+  if (G_UNLIKELY (program->streams[pid])) {
+    GST_WARNING ("Stream already present !");
+    return NULL;
+  }
+
   stream = g_malloc0 (base->stream_size);
   stream->pid = pid;
   stream->stream_type = stream_type;
@@ -529,39 +561,108 @@ mpegts_base_program_remove_stream (MpegTSBase * base,
 }
 
 static void
-mpegts_base_deactivate_pmt (MpegTSBase * base, MpegTSBaseProgram * program)
+mpegts_base_deactivate_program (MpegTSBase * base, MpegTSBaseProgram * program)
 {
-  gint i;
+  gint i, nbstreams;
   guint pid;
-  guint stream_type;
   GstStructure *stream;
   const GValue *streams;
   const GValue *value;
   MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
 
+  if (G_UNLIKELY (program->active == FALSE))
+    return;
+
   GST_DEBUG_OBJECT (base, "Deactivating PMT");
 
+  program->active = FALSE;
+
   if (program->pmt_info) {
     /* Inform subclasses we're deactivating this program */
     if (klass->program_stopped)
       klass->program_stopped (base, program);
 
     streams = gst_structure_id_get_value (program->pmt_info, QUARK_STREAMS);
+    nbstreams = gst_value_list_get_size (streams);
 
-    for (i = 0; i < gst_value_list_get_size (streams); ++i) {
+    for (i = 0; i < nbstreams; ++i) {
       value = gst_value_list_get_value (streams, i);
       stream = g_value_get_boxed (value);
-      gst_structure_id_get (stream, QUARK_PID, G_TYPE_UINT, &pid,
-          QUARK_STREAM_TYPE, G_TYPE_UINT, &stream_type, NULL);
+
+      gst_structure_id_get (stream, QUARK_PID, G_TYPE_UINT, &pid, NULL);
       mpegts_base_program_remove_stream (base, program, (guint16) pid);
-      MPEGTS_BIT_UNSET (base->is_pes, pid);
+
+      /* Only unset the is_pes bit if the PID isn't used in any other active
+       * program */
+      if (!mpegts_pid_in_active_programs (base, pid))
+        MPEGTS_BIT_UNSET (base->is_pes, pid);
     }
+
     /* remove pcr stream */
+    /* FIXME : This might actually be shared with another stream ? */
     mpegts_base_program_remove_stream (base, program, program->pcr_pid);
-    MPEGTS_BIT_UNSET (base->is_pes, program->pcr_pid);
+    if (!mpegts_pid_in_active_programs (base, program->pcr_pid))
+      MPEGTS_BIT_UNSET (base->is_pes, program->pcr_pid);
+
+    GST_DEBUG ("program stream_list is now %p", program->stream_list);
   }
 }
 
+static void
+mpegts_base_activate_program (MpegTSBase * base, MpegTSBaseProgram * program,
+    guint16 pmt_pid, GstStructure * pmt_info)
+{
+  guint i, nbstreams;
+  guint pcr_pid;
+  guint pid;
+  guint16 stream_type;
+  GstStructure *stream;
+  const GValue *new_streams;
+  const GValue *value;
+  MpegTSBaseClass *klass;
+
+  if (G_UNLIKELY (program->active))
+    return;
+
+  GST_DEBUG ("Activating program %d", program->program_number);
+
+  gst_structure_id_get (pmt_info, QUARK_PCR_PID, G_TYPE_UINT, &pcr_pid, NULL);
+
+  /* activate new pmt */
+  if (program->pmt_info)
+    gst_structure_free (program->pmt_info);
+  program->pmt_info = gst_structure_copy (pmt_info);
+  program->pmt_pid = pmt_pid;
+  program->pcr_pid = pcr_pid;
+
+  new_streams = gst_structure_id_get_value (pmt_info, QUARK_STREAMS);
+  nbstreams = gst_value_list_get_size (new_streams);
+
+  for (i = 0; i < nbstreams; ++i) {
+    value = gst_value_list_get_value (new_streams, i);
+    stream = g_value_get_boxed (value);
+
+    gst_structure_id_get (stream, QUARK_PID, G_TYPE_UINT, &pid,
+        QUARK_STREAM_TYPE, G_TYPE_UINT, &stream_type, NULL);
+    MPEGTS_BIT_SET (base->is_pes, pid);
+    mpegts_base_program_add_stream (base, program,
+        (guint16) pid, (guint8) stream_type, stream);
+
+  }
+  /* We add the PCR pid last. If that PID is already used by one of the media
+   * streams above, no new stream will be created */
+  mpegts_base_program_add_stream (base, program, (guint16) pcr_pid, -1, NULL);
+  MPEGTS_BIT_SET (base->is_pes, pcr_pid);
+
+
+  program->active = TRUE;
+
+  klass = GST_MPEGTS_BASE_GET_CLASS (base);
+  if (klass->program_started != NULL)
+    klass->program_started (base, program);
+
+  GST_DEBUG_OBJECT (base, "new pmt %" GST_PTR_FORMAT, pmt_info);
+}
 
 gboolean
 mpegts_base_is_psi (MpegTSBase * base, MpegTSPacketizerPacket * packet)
@@ -628,22 +729,32 @@ mpegts_base_apply_pat (MpegTSBase * base, GstStructure * pat_info)
   guint program_number;
   guint pid;
   MpegTSBaseProgram *program;
-  gint i;
+  gint i, nbprograms;
   const GValue *programs;
-  MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
+
+  GST_INFO_OBJECT (base, "PAT %" GST_PTR_FORMAT, pat_info);
+
+  /* Applying a new PAT does two things:
+   * * It adds the new programs to the list of programs this element handles
+   *   and increments at the same time the number of times a program is referenced.
+   *
+   * * If there was a previously active PAT, It decrements the reference count
+   *   of all program it used. If a program is no longer needed, it is removed.
+   */
 
   old_pat = base->pat;
   base->pat = gst_structure_copy (pat_info);
 
-  GST_INFO_OBJECT (base, "PAT %" GST_PTR_FORMAT, pat_info);
-
   gst_element_post_message (GST_ELEMENT_CAST (base),
       gst_message_new_element (GST_OBJECT (base),
           gst_structure_copy (pat_info)));
 
-  programs = gst_structure_id_get_value (pat_info, QUARK_PROGRAMS);
+
+  GST_LOG ("Activating new Program Association Table");
   /* activate the new table */
-  for (i = 0; i < gst_value_list_get_size (programs); ++i) {
+  programs = gst_structure_id_get_value (pat_info, QUARK_PROGRAMS);
+  nbprograms = gst_value_list_get_size (programs);
+  for (i = 0; i < nbprograms; ++i) {
     value = gst_value_list_get_value (programs, i);
 
     program_info = g_value_get_boxed (value);
@@ -652,6 +763,7 @@ mpegts_base_apply_pat (MpegTSBase * base, GstStructure * pat_info)
 
     program = mpegts_base_get_program (base, program_number);
     if (program) {
+      /* IF the program already existed, just check if the PMT PID changed */
       if (program->pmt_pid != pid) {
         if (program->pmt_pid != G_MAXUINT16) {
           /* pmt pid changed */
@@ -665,17 +777,20 @@ mpegts_base_apply_pat (MpegTSBase * base, GstStructure * pat_info)
         MPEGTS_BIT_SET (base->known_psi, pid);
       }
     } else {
-      MPEGTS_BIT_SET (base->known_psi, pid);
+      /* Create a new program */
       program = mpegts_base_add_program (base, program_number, pid);
     }
+    /* We mark this program as being referenced by one PAT */
     program->patcount += 1;
   }
 
   if (old_pat) {
     /* deactivate the old table */
+    GST_LOG ("Deactivating old Program Association Table");
 
     programs = gst_structure_id_get_value (old_pat, QUARK_PROGRAMS);
-    for (i = 0; i < gst_value_list_get_size (programs); ++i) {
+    nbprograms = gst_value_list_get_size (programs);
+    for (i = 0; i < nbprograms; ++i) {
       value = gst_value_list_get_value (programs, i);
 
       program_info = g_value_get_boxed (value);
@@ -684,7 +799,7 @@ mpegts_base_apply_pat (MpegTSBase * base, GstStructure * pat_info)
           QUARK_PID, G_TYPE_UINT, &pid, NULL);
 
       program = mpegts_base_get_program (base, program_number);
-      if (program == NULL) {
+      if (G_UNLIKELY (program == NULL)) {
         GST_DEBUG_OBJECT (base, "broken PAT, duplicated entry for program %d",
             program_number);
         continue;
@@ -697,10 +812,7 @@ mpegts_base_apply_pat (MpegTSBase * base, GstStructure * pat_info)
       GST_INFO_OBJECT (base, "PAT removing program %" GST_PTR_FORMAT,
           program_info);
 
-      if (klass->program_stopped) {
-        klass->program_stopped (base, program);
-      }
-      mpegts_base_deactivate_pmt (base, program);
+      mpegts_base_deactivate_program (base, program);
       mpegts_base_remove_program (base, program_number);
       /* FIXME: when this happens it may still be pmt pid of another
        * program, so setting to False may make it go through expensive
@@ -711,26 +823,17 @@ mpegts_base_apply_pat (MpegTSBase * base, GstStructure * pat_info)
 
     gst_structure_free (old_pat);
   }
-#if 0
-  mpegts_base_sync_program_pads (base);
-#endif
 }
 
 static void
 mpegts_base_apply_pmt (MpegTSBase * base,
     guint16 pmt_pid, GstStructure * pmt_info)
 {
-  MpegTSBaseProgram *program;
+  MpegTSBaseProgram *program, *old_program;
   guint program_number;
-  guint pcr_pid;
-  guint pid;
-  guint stream_type;
-  GstStructure *stream;
-  gint i;
-  const GValue *new_streams;
-  const GValue *value;
-  MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
+  gboolean deactivate_old_program = FALSE;
 
+  /* FIXME : not so sure this is valid anymore */
   if (G_UNLIKELY (base->seen_pat == FALSE)) {
     GST_WARNING ("Got pmt without pat first. Returning");
     /* remove the stream since we won't get another PMT otherwise */
@@ -738,57 +841,55 @@ mpegts_base_apply_pmt (MpegTSBase * base,
     return;
   }
 
-  GST_DEBUG ("Applying PMT (pid:0x%04x)", pmt_pid);
-
-  gst_structure_id_get (pmt_info,
-      QUARK_PROGRAM_NUMBER, G_TYPE_UINT, &program_number,
-      QUARK_PCR_PID, G_TYPE_UINT, &pcr_pid, NULL);
-  new_streams = gst_structure_id_get_value (pmt_info, QUARK_STREAMS);
-
-  program = mpegts_base_get_program (base, program_number);
-  if (program) {
-    GST_DEBUG ("Deactivating old program");
-    /* deactivate old pmt */ ;
-    mpegts_base_deactivate_pmt (base, program);
-    if (program->pmt_info)
-      gst_structure_free (program->pmt_info);
-    program->pmt_info = NULL;
-  } else {
-    /* no PAT?? */
-    MPEGTS_BIT_SET (base->known_psi, pmt_pid);
-    program = mpegts_base_add_program (base, program_number, pid);
-  }
-
-  GST_DEBUG ("Now activating new program");
+  gst_structure_id_get (pmt_info, QUARK_PROGRAM_NUMBER, G_TYPE_UINT,
+      &program_number, NULL);
 
-  /* activate new pmt */
-  program->pmt_info = gst_structure_copy (pmt_info);
-  program->pmt_pid = pmt_pid;
-  program->pcr_pid = pcr_pid;
-  mpegts_base_program_add_stream (base, program, (guint16) pcr_pid, -1, NULL);
-  MPEGTS_BIT_SET (base->is_pes, pcr_pid);
+  GST_DEBUG ("Applying PMT (program_number:%d, pid:0x%04x)",
+      program_number, pmt_pid);
 
-  for (i = 0; i < gst_value_list_get_size (new_streams); ++i) {
-    value = gst_value_list_get_value (new_streams, i);
-    stream = g_value_get_boxed (value);
+  /* In order for stream switching to happen properly in decodebin(2),
+   * we need to first add the new pads (i.e. activate the new program)
+   * before removing the old ones (i.e. deactivating the old program)
+   */
 
-    gst_structure_id_get (stream, QUARK_PID, G_TYPE_UINT, &pid,
-        QUARK_STREAM_TYPE, G_TYPE_UINT, &stream_type, NULL);
-    MPEGTS_BIT_SET (base->is_pes, pid);
-    mpegts_base_program_add_stream (base, program,
-        (guint16) pid, (guint8) stream_type, stream);
+  old_program = mpegts_base_get_program (base, program_number);
+  if (G_UNLIKELY (old_program == NULL))
+    goto no_program;
+
+  /* If the current program is active, this means we have a new program */
+  if (old_program->active) {
+    old_program = mpegts_base_steal_program (base, program_number);
+    program = mpegts_base_new_program (base, program_number, pmt_pid);
+    g_hash_table_insert (base->programs,
+        GINT_TO_POINTER (program_number), program);
+    deactivate_old_program = TRUE;
+  } else
+    program = old_program;
 
-  }
+  /* First activate program */
+  mpegts_base_activate_program (base, program, pmt_pid, pmt_info);
 
-  if (klass->program_started != NULL) {
-    klass->program_started (base, program);
+  if (deactivate_old_program) {
+    /* deactivate old pmt */ ;
+    mpegts_base_deactivate_program (base, old_program);
+    mpegts_base_free_program (old_program);
   }
 
-  GST_DEBUG_OBJECT (base, "new pmt %" GST_PTR_FORMAT, pmt_info);
+  /* if (program->pmt_info) */
+  /*   gst_structure_free (program->pmt_info); */
+  /* program->pmt_info = NULL; */
 
   gst_element_post_message (GST_ELEMENT_CAST (base),
       gst_message_new_element (GST_OBJECT (base),
           gst_structure_copy (pmt_info)));
+
+  return;
+
+no_program:
+  {
+    GST_ERROR ("Attempted to apply a PMT on a program that wasn't created");
+    return;
+  }
 }
 
 static void
index 44cb501..369078d 100644 (file)
@@ -70,6 +70,9 @@ struct _MpegTSBaseProgram
   /* Pending Tags for the program */
   GstTagList *tags;
   guint event_id;
+
+  /* TRUE if the program is currently being used */
+  gboolean active;
 };
 
 typedef enum {
index 9e70073..6ea958b 100644 (file)
@@ -1177,6 +1177,10 @@ create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
       name = g_strdup_printf ("subpicture_%04x", bstream->pid);
       caps = gst_caps_new_simple ("subpicture/x-pgs", NULL);
       break;
+    default:
+      GST_WARNING ("Non-media stream (stream_type:0x%x). Not creating pad",
+          bstream->stream_type);
+      break;
   }
   if (template && name && caps) {
     GST_LOG ("stream:%p creating pad with name %s and caps %s", stream, name,
@@ -1214,6 +1218,7 @@ static void
 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
 {
   TSDemuxStream *stream = (TSDemuxStream *) bstream;
+
   if (stream) {
     if (stream->pad) {
       /* Unref the pad, clear it */
@@ -1234,7 +1239,10 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
     gst_element_add_pad ((GstElement *) tsdemux, stream->pad);
     GST_DEBUG_OBJECT (stream->pad, "done adding pad");
   } else
-    GST_WARNING_OBJECT (tsdemux, "stream %p has no pad", stream);
+    GST_WARNING_OBJECT (tsdemux,
+        "stream %p (pid 0x%04x, type:0x%03x) has no pad", stream,
+        ((MpegTSBaseStream *) stream)->pid,
+        ((MpegTSBaseStream *) stream)->stream_type);
 }
 
 static void
@@ -1264,6 +1272,9 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
 {
   GstTSDemux *demux = GST_TS_DEMUX (base);
 
+  GST_DEBUG ("Current program %d, new program %d",
+      demux->program_number, program->program_number);
+
   if (demux->program_number == -1 ||
       demux->program_number == program->program_number) {
     GList *tmp;
@@ -1272,17 +1283,16 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
     demux->program_number = program->program_number;
     demux->program = program;
 
-    /* Activate all stream pads, the pads will already have been created */
-
-    /* FIXME : Actually, we don't want to activate *ALL* streams !
-     * For example, we don't want to expose HDV AUX private streams, we will just
-     * be using them directly for seeking and metadata. */
-    if (base->mode != BASE_MODE_SCANNING)
+    /* Activate all stream pads, pads will already have been created */
+    if (base->mode != BASE_MODE_SCANNING) {
       for (tmp = program->stream_list; tmp; tmp = tmp->next)
         activate_pad_for_stream (demux, (TSDemuxStream *) tmp->data);
+      gst_element_no_more_pads ((GstElement *) demux);
+    }
 
     /* Inform scanner we have got our program */
     demux->current_program_number = program->program_number;
+    demux->need_newsegment = TRUE;
   }
 }
 
@@ -1295,23 +1305,20 @@ gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
 
   GST_LOG ("program %d stopped", program->program_number);
 
-  if (demux->program == NULL || program != demux->program)
-    return;
-
-  for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
+  for (tmp = program->stream_list; tmp; tmp = tmp->next) {
     localstream = (TSDemuxStream *) tmp->data;
     if (localstream->pad) {
-      GST_DEBUG ("HAVE PAD %s:%s", GST_DEBUG_PAD_NAME (localstream->pad));
-      if (gst_pad_is_active (localstream->pad))
+      if (gst_pad_is_active (localstream->pad)) {
+        GST_DEBUG ("Pushing EOS and deactivating pad %s:%s",
+            GST_DEBUG_PAD_NAME (localstream->pad));
+        gst_pad_push_event (localstream->pad, gst_event_new_eos ());
+        gst_pad_set_active (localstream->pad, FALSE);
         gst_element_remove_pad (GST_ELEMENT_CAST (demux), localstream->pad);
-      else
+      else
         gst_object_unref (localstream->pad);
       localstream->pad = NULL;
     }
   }
-
-  demux->program = NULL;
-  demux->program_number = -1;
 }
 
 static gboolean
@@ -2081,7 +2088,8 @@ calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
 
     start = firstpts;
     stop = GST_CLOCK_TIME_NONE;
-    position = demux->segment.time;
+    position = demux->segment.time ? firstpts - demux->segment.time : 0;
+    demux->segment.time = start;
   } else {
     /* pull mode */
     GST_DEBUG ("pull-based. Segment start:%" GST_TIME_FORMAT " duration:%"