mpegtsbase/tsdemux: Refactor seek and segment handling
authorEdward Hervey <edward.hervey@collabora.co.uk>
Thu, 1 Mar 2012 17:05:17 +0000 (18:05 +0100)
committerEdward Hervey <edward.hervey@collabora.co.uk>
Thu, 1 Mar 2012 17:15:51 +0000 (18:15 +0100)
All calculations go through the mpegtspacketizer
Remove unused variables/code

gst/mpegtsdemux/gstmpegdefs.h
gst/mpegtsdemux/mpegtsbase.c
gst/mpegtsdemux/mpegtsbase.h
gst/mpegtsdemux/tsdemux.c
gst/mpegtsdemux/tsdemux.h

index 66f922b..62d98c7 100644 (file)
             GST_MSECOND/10, CLOCK_BASE))
 #define GSTTIME_TO_MPEGTIME(time) (gst_util_uint64_scale ((time), \
             CLOCK_BASE, GST_MSECOND/10))
+#define GSTTIME_TO_PCRTIME(time) (gst_util_uint64_scale ((time), \
+            300 * CLOCK_BASE, GST_MSECOND/10))
 
 #define MPEG_MUX_RATE_MULT      50
 
index 0f6a308..45630f2 100644 (file)
@@ -221,9 +221,7 @@ mpegts_base_reset (MpegTSBase * base)
 
   base->mode = BASE_MODE_STREAMING;
   base->seen_pat = FALSE;
-  base->first_pat_offset = -1;
-  base->in_gap = 0;
-  base->first_buf_ts = GST_CLOCK_TIME_NONE;
+  base->seek_offset = -1;
 
   base->upstream_live = FALSE;
   base->query_latency = FALSE;
@@ -621,7 +619,7 @@ mpegts_base_deactivate_program (MpegTSBase * base, MpegTSBaseProgram * program)
 
 static void
 mpegts_base_activate_program (MpegTSBase * base, MpegTSBaseProgram * program,
-    guint16 pmt_pid, GstStructure * pmt_info)
+    guint16 pmt_pid, GstStructure * pmt_info, gboolean initial_program)
 {
   guint i, nbstreams;
   guint pcr_pid;
@@ -666,8 +664,8 @@ mpegts_base_activate_program (MpegTSBase * base, MpegTSBaseProgram * program,
   mpegts_base_program_add_stream (base, program, (guint16) pcr_pid, -1, NULL);
   MPEGTS_BIT_SET (base->is_pes, pcr_pid);
 
-
   program->active = TRUE;
+  program->initial_program = initial_program;
 
   klass = GST_MPEGTS_BASE_GET_CLASS (base);
   if (klass->program_started != NULL)
@@ -855,6 +853,7 @@ mpegts_base_apply_pmt (MpegTSBase * base,
 {
   MpegTSBaseProgram *program, *old_program;
   guint program_number;
+  gboolean initial_program = TRUE;
 
   /* FIXME : not so sure this is valid anymore */
   if (G_UNLIKELY (base->seen_pat == FALSE)) {
@@ -889,11 +888,13 @@ mpegts_base_apply_pmt (MpegTSBase * base,
     /* Desactivate the old program */
     mpegts_base_deactivate_program (base, old_program);
     mpegts_base_free_program (old_program);
+    initial_program = FALSE;
   } else
     program = old_program;
 
   /* First activate program */
-  mpegts_base_activate_program (base, program, pmt_pid, pmt_info);
+  mpegts_base_activate_program (base, program, pmt_pid, pmt_info,
+      initial_program);
 
   /* if (program->pmt_info) */
   /*   gst_structure_free (program->pmt_info); */
@@ -986,9 +987,10 @@ mpegts_base_handle_psi (MpegTSBase * base, MpegTSPacketizerSection * section)
         mpegts_base_apply_pat (base, structure);
         if (base->seen_pat == FALSE) {
           base->seen_pat = TRUE;
-          base->first_pat_offset = GST_BUFFER_OFFSET (section->buffer);
           GST_DEBUG ("First PAT offset: %" G_GUINT64_FORMAT,
-              base->first_pat_offset);
+              GST_BUFFER_OFFSET (section->buffer));
+          mpegts_packetizer_set_reference_offset (base->packetizer,
+              GST_BUFFER_OFFSET (section->buffer));
         }
 
       } else
@@ -1215,8 +1217,6 @@ mpegts_base_sink_event (GstPad * pad, GstEvent * event)
       gst_segment_set_newsegment_full (&base->segment, update, rate,
           applied_rate, format, start, stop, position);
       gst_event_unref (event);
-      base->in_gap = GST_CLOCK_TIME_NONE;
-      base->first_buf_ts = GST_CLOCK_TIME_NONE;
     }
       break;
     case GST_EVENT_EOS:
@@ -1231,7 +1231,6 @@ mpegts_base_sink_event (GstPad * pad, GstEvent * event)
     case GST_EVENT_FLUSH_STOP:
       gst_segment_init (&base->segment, GST_FORMAT_UNDEFINED);
       base->seen_pat = FALSE;
-      base->first_pat_offset = -1;
       /* Passthrough */
     default:
       res = GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, event);
@@ -1256,9 +1255,6 @@ query_upstream_latency (MpegTSBase * base)
     GST_WARNING_OBJECT (base, "Failed to query upstream latency");
   gst_query_unref (query);
   base->query_latency = TRUE;
-
-  /* Calculate clock skew for live streams only */
-  base->packetizer->calculate_skew = base->upstream_live;
 }
 
 static inline GstFlowReturn
@@ -1293,13 +1289,6 @@ mpegts_base_chain (GstPad * pad, GstBuffer * buf)
     query_upstream_latency (base);
   }
 
-  if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (base->first_buf_ts)) &&
-      GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
-    base->first_buf_ts = GST_BUFFER_TIMESTAMP (buf);
-    GST_DEBUG_OBJECT (base, "first buffer timestamp %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (base->first_buf_ts));
-  }
-
   mpegts_packetizer_push (base->packetizer, buf);
   while (((pret = mpegts_packetizer_next_packet (base->packetizer,
                   &packet)) != PACKET_NEED_MORE) && res == GST_FLOW_OK) {
@@ -1347,17 +1336,20 @@ mpegts_base_scan (MpegTSBase * base)
   GstFlowReturn ret;
   GstBuffer *buf;
   guint i;
-  MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
+  gboolean done = FALSE;
+  MpegTSPacketizerPacketReturn pret;
+  gint64 tmpval;
+  guint64 upstream_size, seek_pos;
+  GstFormat format;
+  guint initial_pcr_seen;
 
   GST_DEBUG ("Scanning for initial sync point");
 
-  /* Find initial sync point */
-  for (i = 0; i < 10; i++) {
-    GST_DEBUG ("Grabbing %d => %d", i * 50 * MPEGTS_MAX_PACKETSIZE,
-        50 * MPEGTS_MAX_PACKETSIZE);
+  /* Find initial sync point and at least 5 PCR values */
+  for (i = 0; i < 10 && !done; i++) {
+    GST_DEBUG ("Grabbing %d => %d", i * 65536, 65536);
 
-    ret = gst_pad_pull_range (base->sinkpad, i * 50 * MPEGTS_MAX_PACKETSIZE,
-        50 * MPEGTS_MAX_PACKETSIZE, &buf);
+    ret = gst_pad_pull_range (base->sinkpad, i * 65536, 65536, &buf);
     if (G_UNLIKELY (ret != GST_FLOW_OK))
       goto beach;
 
@@ -1365,33 +1357,80 @@ mpegts_base_scan (MpegTSBase * base)
     mpegts_packetizer_push (base->packetizer, buf);
 
     if (mpegts_packetizer_has_packets (base->packetizer)) {
-      /* Mark the initial sync point and remember the packetsize */
-      base->initial_sync_point = base->seek_offset = base->packetizer->offset;
-      GST_DEBUG ("Sync point is now %" G_GUINT64_FORMAT, base->seek_offset);
-      base->packetsize = base->packetizer->packet_size;
+      if (base->seek_offset == -1) {
+        /* Mark the initial sync point and remember the packetsize */
+        base->seek_offset = base->packetizer->offset;
+        GST_DEBUG ("Sync point is now %" G_GUINT64_FORMAT, base->seek_offset);
+        base->packetsize = base->packetizer->packet_size;
+      }
+      while (1) {
+        /* Eat up all packets */
+        pret = mpegts_packetizer_process_next_packet (base->packetizer);
+        if (pret == PACKET_NEED_MORE)
+          break;
+        if (pret != PACKET_BAD &&
+            mpegts_packetizer_get_seen_pcr (base->packetizer) >= 5) {
+          GST_DEBUG ("Got enough initial PCR");
+          done = TRUE;
+          break;
+        }
+      }
+    }
+  }
+
+  initial_pcr_seen = mpegts_packetizer_get_seen_pcr (base->packetizer);
+  if (G_UNLIKELY (initial_pcr_seen == 0))
+    goto no_initial_pcr;
+  GST_DEBUG ("Seen %d initial PCR", initial_pcr_seen);
 
-      /* If the subclass can seek for timestamps, do that */
-      if (klass->find_timestamps) {
-        guint64 offset;
-        mpegts_packetizer_clear (base->packetizer);
+  /* Now send data from the end */
+  mpegts_packetizer_clear (base->packetizer);
 
-        ret = klass->find_timestamps (base, 0, &offset);
+  /* Get the size of upstream */
+  format = GST_FORMAT_BYTES;
+  if (!gst_pad_query_peer_duration (base->sinkpad, &format, &tmpval))
+    goto beach;
+  upstream_size = tmpval;
+  done = FALSE;
 
-        base->initial_sync_point = base->seek_offset =
-            base->packetizer->offset = base->first_pat_offset;
-        GST_DEBUG ("Sync point is now %" G_GUINT64_FORMAT, base->seek_offset);
-      }
+  /* Find last PCR value */
+  for (seek_pos = MAX (0, upstream_size - 655360);
+      seek_pos < upstream_size && !done; seek_pos += 65536) {
+    GST_DEBUG ("Grabbing %d => %d", seek_pos, 65536);
+
+    ret = gst_pad_pull_range (base->sinkpad, seek_pos, 65536, &buf);
+    if (G_UNLIKELY (ret != GST_FLOW_OK))
       goto beach;
+
+    /* Push to packetizer */
+    mpegts_packetizer_push (base->packetizer, buf);
+
+    if (mpegts_packetizer_has_packets (base->packetizer)) {
+      while (1) {
+        /* Eat up all packets */
+        pret = mpegts_packetizer_process_next_packet (base->packetizer);
+        if (pret == PACKET_NEED_MORE)
+          break;
+        if (pret != PACKET_BAD &&
+            mpegts_packetizer_get_seen_pcr (base->packetizer) >
+            initial_pcr_seen) {
+          GST_DEBUG ("Got last PCR");
+          done = TRUE;
+          break;
+        }
+      }
     }
   }
 
-  GST_WARNING ("Didn't find initial sync point");
-  ret = GST_FLOW_ERROR;
-
 beach:
   mpegts_packetizer_clear (base->packetizer);
   return ret;
 
+no_initial_pcr:
+  mpegts_packetizer_clear (base->packetizer);
+  GST_WARNING_OBJECT (base, "Couldn't find any PCR within the first %d bytes",
+      10 * 65536);
+  return GST_FLOW_ERROR;
 }
 
 
@@ -1409,7 +1448,7 @@ mpegts_base_loop (MpegTSBase * base)
       GST_DEBUG ("Changing to Streaming");
       break;
     case BASE_MODE_SEEKING:
-      /* FIXME : yes, we should do something here */
+      /* FIXME : unclear if we still need mode_seeking... */
       base->mode = BASE_MODE_STREAMING;
       break;
     case BASE_MODE_STREAMING:
@@ -1492,6 +1531,8 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
   flush = flags & GST_SEEK_FLAG_FLUSH;
 
   if (base->mode == BASE_MODE_PUSHING) {
+    /* FIXME : Actually ... it is supported, we just need to convert
+     * the seek event to BYTES */
     GST_ERROR ("seeking in push mode not supported");
     goto push_mode;
   }
@@ -1505,6 +1546,7 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
         gst_event_new_flush_start ());
   } else
     gst_pad_pause_task (base->sinkpad);
+
   /* wait for streaming to finish */
   GST_PAD_STREAM_LOCK (base->sinkpad);
 
@@ -1512,6 +1554,8 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
     /* send a FLUSH_STOP for the sinkpad, since we need data for seeking */
     GST_DEBUG_OBJECT (base, "sending flush stop");
     gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ());
+    /* And actually flush our pending data */
+    mpegts_base_flush (base);
   }
 
   if (flags & (GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_SKIP)) {
@@ -1526,11 +1570,9 @@ mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
       ret = klass->seek (base, event);
       if (G_UNLIKELY (ret != GST_FLOW_OK)) {
         GST_WARNING ("seeking failed %s", gst_flow_get_name (ret));
-        goto done;
       }
     } else {
       GST_WARNING ("subclass has no seek implementation");
-      goto done;
     }
   }
 
@@ -1568,6 +1610,7 @@ mpegts_base_sink_activate_pull (GstPad * pad, gboolean active)
   MpegTSBase *base = GST_MPEGTS_BASE (GST_OBJECT_PARENT (pad));
   if (active) {
     base->mode = BASE_MODE_SCANNING;
+    base->packetizer->calculate_offset = TRUE;
     return gst_pad_start_task (pad, (GstTaskFunction) mpegts_base_loop, base);
   } else
     return gst_pad_stop_task (pad);
@@ -1578,6 +1621,7 @@ mpegts_base_sink_activate_push (GstPad * pad, gboolean active)
 {
   MpegTSBase *base = GST_MPEGTS_BASE (GST_OBJECT_PARENT (pad));
   base->mode = BASE_MODE_PUSHING;
+  base->packetizer->calculate_skew = TRUE;
   return TRUE;
 }
 
index 9e6363e..b0183df 100644 (file)
@@ -78,6 +78,8 @@ struct _MpegTSBaseProgram
 
   /* TRUE if the program is currently being used */
   gboolean active;
+  /* TRUE if this is the first program created */
+  gboolean initial_program;
 };
 
 typedef enum {
@@ -98,9 +100,6 @@ struct _MpegTSBase {
   /* pull-based behaviour */
   MpegTSBaseMode mode;
 
-  /* location of first sync point */
-  guint64      initial_sync_point;
-
   /* Current pull offset (also set by seek handler) */
   guint64      seek_offset;
 
@@ -132,13 +131,6 @@ struct _MpegTSBase {
   /* Whether we saw a PAT yet */
   gboolean seen_pat;
 
-  /* Offset from the origin to the first PAT (pullmode) */
-  guint64    first_pat_offset;
-
-  /* interpolation gap between the upstream timestamp and the pts */
-  GstClockTime in_gap;
-  GstClockTime first_buf_ts;
-
   /* Whether upstream is live or not */
   gboolean upstream_live;
   /* Whether we queried the upstream latency or not */
index 812addb..c941be9 100644 (file)
 /* seek to SEEK_TIMESTAMP_OFFSET before the desired offset and search then
  * either accurately or for the next timestamp
  */
-#define SEEK_TIMESTAMP_OFFSET (1000 * GST_MSECOND)
+#define SEEK_TIMESTAMP_OFFSET (500 * GST_MSECOND)
 
+#define SEGMENT_FORMAT "[format:%s, rate:%f, start:%"                  \
+  GST_TIME_FORMAT", stop:%"GST_TIME_FORMAT", time:%"GST_TIME_FORMAT    \
+  ", accum:%"GST_TIME_FORMAT", last_stop:%"GST_TIME_FORMAT             \
+  ", duration:%"GST_TIME_FORMAT"]"
+
+#define SEGMENT_ARGS(a) gst_format_get_name((a).format), (a).rate,     \
+    GST_TIME_ARGS((a).start), GST_TIME_ARGS((a).stop),                 \
+    GST_TIME_ARGS((a).time), GST_TIME_ARGS((a).accum),                 \
+    GST_TIME_ARGS((a).last_stop), GST_TIME_ARGS((a).duration)
 
 
 GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
@@ -128,9 +137,15 @@ struct _TSDemuxStream
   /* Raw value of current PTS/DTS */
   guint64 raw_pts;
   guint64 raw_dts;
+  /* PTS/DTS with rollover fixed */
+  guint64 fixed_pts;
+  guint64 fixed_dts;
   /* Number of rollover seen for PTS/DTS (default:0) */
   guint nb_pts_rollover;
   guint nb_dts_rollover;
+
+  /* Whether this stream needs to send a newsegment */
+  gboolean need_newsegment;
 };
 
 #define VIDEO_CAPS \
@@ -204,8 +219,6 @@ enum
 };
 
 /* Pad functions */
-static const GstQueryType *gst_ts_demux_srcpad_query_types (GstPad * pad);
-static gboolean gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query);
 
 
 /* mpegtsbase methods */
@@ -304,12 +317,19 @@ gst_ts_demux_reset (MpegTSBase * base)
 {
   GstTSDemux *demux = (GstTSDemux *) base;
 
-  demux->need_newsegment = TRUE;
   demux->program_number = -1;
-
-  demux->duration = GST_CLOCK_TIME_NONE;
+  demux->calculate_update_segment = FALSE;
 
   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
+  if (demux->segment_event) {
+    gst_event_unref (demux->segment_event);
+    demux->segment_event = NULL;
+  }
+
+  if (demux->update_segment) {
+    gst_event_unref (demux->update_segment);
+    demux->update_segment = NULL;
+  }
 }
 
 static void
@@ -389,9 +409,21 @@ gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query)
       GST_DEBUG ("query duration");
       gst_query_parse_duration (query, &format, NULL);
       if (format == GST_FORMAT_TIME) {
-        if (!gst_pad_peer_query (base->sinkpad, query))
-          gst_query_set_duration (query, GST_FORMAT_TIME,
-              demux->segment.duration);
+        if (!gst_pad_peer_query (base->sinkpad, query)) {
+          gint64 val;
+
+          format = GST_FORMAT_BYTES;
+          if (!gst_pad_query_peer_duration (base->sinkpad, &format, &val))
+            res = FALSE;
+          else {
+            GstClockTime dur =
+                mpegts_packetizer_offset_to_ts (base->packetizer, val);
+            if (GST_CLOCK_TIME_IS_VALID (dur))
+              gst_query_set_duration (query, GST_FORMAT_TIME, dur);
+            else
+              res = FALSE;
+          }
+        }
       } else {
         GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
         res = FALSE;
@@ -454,13 +486,6 @@ gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query)
 }
 
 static GstFlowReturn
-gst_ts_demux_perform_seek (MpegTSBase * base, GstSegment * segment)
-{
-  return GST_FLOW_ERROR;
-}
-
-
-static GstFlowReturn
 gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
 {
   GstTSDemux *demux = (GstTSDemux *) base;
@@ -472,6 +497,7 @@ gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
   gint64 start, stop;
   GstSegment seeksegment;
   gboolean update;
+  guint64 start_offset;
 
   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
       &stop_type, &stop);
@@ -492,31 +518,35 @@ gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
   /* copy segment, we need this because we still need the old
    * segment when we close the current segment. */
   memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
+  if (demux->segment_event) {
+    gst_event_unref (demux->segment_event);
+    demux->segment_event = NULL;
+  }
   /* configure the segment with the seek variables */
   GST_DEBUG_OBJECT (demux, "configuring seek");
-  GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
-      GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
-      " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
-      GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
-      GST_TIME_ARGS (seeksegment.last_stop),
-      GST_TIME_ARGS (seeksegment.duration));
+  GST_DEBUG ("seeksegment before set_seek " SEGMENT_FORMAT,
+      SEGMENT_ARGS (seeksegment));
+
   gst_segment_set_seek (&seeksegment, rate, format, flags, start_type, start,
       stop_type, stop, &update);
-  GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
-      GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
-      " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
-      GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
-      GST_TIME_ARGS (seeksegment.last_stop),
-      GST_TIME_ARGS (seeksegment.duration));
-
-  res = gst_ts_demux_perform_seek (base, &seeksegment);
-  if (G_UNLIKELY (res != GST_FLOW_OK)) {
-    GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
+
+  GST_DEBUG ("seeksegment after set_seek " SEGMENT_FORMAT,
+      SEGMENT_ARGS (seeksegment));
+
+  /* Convert start/stop to offset */
+  start_offset =
+      mpegts_packetizer_ts_to_offset (base->packetizer, MAX (0,
+          start - SEEK_TIMESTAMP_OFFSET));
+
+  if (G_UNLIKELY (start_offset == -1)) {
+    GST_WARNING ("Couldn't convert start position to an offset");
     goto done;
   }
 
+  /* record offset */
+  base->seek_offset = start_offset;
+  res = GST_FLOW_OK;
+
   /* commit the new segment */
   memcpy (&demux->segment, &seeksegment, sizeof (GstSegment));
 
@@ -927,10 +957,13 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
     if (bstream->stream_type != 0xff)
       stream->pad = create_pad_for_stream (base, bstream, program);
 
+    stream->need_newsegment = TRUE;
     stream->pts = GST_CLOCK_TIME_NONE;
     stream->dts = GST_CLOCK_TIME_NONE;
     stream->raw_pts = 0;
     stream->raw_dts = 0;
+    stream->fixed_pts = 0;
+    stream->fixed_dts = 0;
     stream->nb_pts_rollover = 0;
     stream->nb_dts_rollover = 0;
   }
@@ -940,24 +973,14 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
 static void
 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
 {
-  GstTSDemux *demux = GST_TS_DEMUX (base);
   TSDemuxStream *stream = (TSDemuxStream *) bstream;
 
   if (stream->pad) {
     if (gst_pad_is_active (stream->pad)) {
-      gboolean need_newsegment = demux->need_newsegment;
-
-      /* We must not send the newsegment when flushing the pending data
-         on the removed stream. We should only push it when the newly added
-         stream finishes parsing its PTS */
-      demux->need_newsegment = FALSE;
-
       /* Flush out all data */
       GST_DEBUG_OBJECT (stream->pad, "Flushing out pending data");
       gst_ts_demux_push_pending_data ((GstTSDemux *) base, stream);
 
-      demux->need_newsegment = need_newsegment;
-
       GST_DEBUG_OBJECT (stream->pad, "Pushing out EOS");
       gst_pad_push_event (stream->pad, gst_event_new_eos ());
       GST_DEBUG_OBJECT (stream->pad, "Deactivating and removing pad");
@@ -998,6 +1021,15 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream)
   stream->nbpending = 0;
 
   stream->current = NULL;
+  stream->need_newsegment = TRUE;
+  stream->pts = GST_CLOCK_TIME_NONE;
+  stream->dts = GST_CLOCK_TIME_NONE;
+  stream->raw_pts = 0;
+  stream->raw_dts = 0;
+  stream->fixed_pts = 0;
+  stream->fixed_dts = 0;
+  stream->nb_pts_rollover = 0;
+  stream->nb_dts_rollover = 0;
 }
 
 static void
@@ -1023,6 +1055,20 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
     demux->program_number = program->program_number;
     demux->program = program;
 
+    /* If this is not the initial program, we need to calculate
+     * an update newsegment */
+    demux->calculate_update_segment = !program->initial_program;
+
+    /* If we have an upstream time segment and it's the initial program, just use that */
+    if (program->initial_program && base->segment.format == GST_FORMAT_TIME) {
+      demux->segment = base->segment;
+      demux->segment_event =
+          gst_event_new_new_segment_full (FALSE, base->segment.rate,
+          base->segment.applied_rate, GST_FORMAT_TIME, base->segment.start,
+          base->segment.stop, base->segment.time);
+      GST_EVENT_SRC (demux->segment_event) = gst_object_ref (demux);
+    }
+
     /* Activate all stream pads, pads will already have been created */
     if (base->mode != BASE_MODE_SCANNING) {
       for (tmp = program->stream_list; tmp; tmp = tmp->next)
@@ -1032,7 +1078,6 @@ gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
 
     /* Inform scanner we have got our program */
     demux->current_program_number = program->program_number;
-    demux->need_newsegment = TRUE;
   }
 }
 
@@ -1113,8 +1158,8 @@ gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
 
   /* Compute PTS in GstClockTime */
   stream->raw_pts = pts;
-  stream->pts =
-      MPEGTIME_TO_GSTTIME (pts + stream->nb_pts_rollover * PTS_DTS_MAX_VALUE);
+  stream->fixed_pts = pts + stream->nb_pts_rollover * PTS_DTS_MAX_VALUE;
+  stream->pts = MPEGTIME_TO_GSTTIME (stream->fixed_pts);
 
   GST_LOG ("pid 0x%04x Stored PTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
       bs->pid, stream->raw_pts, GST_TIME_ARGS (stream->pts));
@@ -1159,8 +1204,8 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
 
   /* Compute DTS in GstClockTime */
   stream->raw_dts = dts;
-  stream->dts =
-      MPEGTIME_TO_GSTTIME (dts + stream->nb_dts_rollover * PTS_DTS_MAX_VALUE);
+  stream->fixed_dts = dts + stream->nb_dts_rollover * PTS_DTS_MAX_VALUE;
+  stream->dts = MPEGTIME_TO_GSTTIME (stream->fixed_dts);
 
   GST_LOG ("pid 0x%04x Stored DTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
       bs->pid, stream->raw_dts, GST_TIME_ARGS (stream->dts));
@@ -1215,46 +1260,12 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
         GST_TIME_ARGS (stream->pts),
         GST_TIME_ARGS (MPEGTIME_TO_GSTTIME (header.DTS)));
 
-    /* safe default if insufficient upstream info */
-    if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (base->in_gap) &&
-            GST_CLOCK_TIME_IS_VALID (base->first_buf_ts) &&
-            base->mode == BASE_MODE_PUSHING &&
-            base->segment.format == GST_FORMAT_TIME)) {
-      /* Find the earliest current PTS we're going to push */
-      GstClockTime firstpts = GST_CLOCK_TIME_NONE;
-      GList *tmp;
-
-      for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
-        TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
-        if (!GST_CLOCK_TIME_IS_VALID (firstpts) || pstream->pts < firstpts)
-          firstpts = pstream->pts;
-      }
-
-      base->in_gap = base->first_buf_ts - firstpts;
-      GST_DEBUG_OBJECT (base, "upstream segment start %" GST_TIME_FORMAT
-          ", first buffer timestamp: %" GST_TIME_FORMAT
-          ", first PTS: %" GST_TIME_FORMAT
-          ", interpolation gap: %" GST_TIME_FORMAT,
-          GST_TIME_ARGS (base->segment.start),
-          GST_TIME_ARGS (base->first_buf_ts), GST_TIME_ARGS (firstpts),
-          GST_TIME_ARGS (base->in_gap));
-    }
-
-    if (!GST_CLOCK_TIME_IS_VALID (base->in_gap))
-      base->in_gap = 0;
-
-    if (base->upstream_live) {
+    {
       MpegTSPacketizer2 *packetizer = base->packetizer;
 
-      if (GST_CLOCK_TIME_IS_VALID (packetizer->base_time))
-        GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) =
-            stream->pts - packetizer->base_pcrtime + packetizer->base_time +
-            packetizer->skew;
-      else
-        GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) = GST_CLOCK_TIME_NONE;
-    } else
       GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) =
-          stream->pts + base->in_gap;
+          mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
+    }
     GST_DEBUG ("buf %" GST_TIME_FORMAT,
         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0])));
   }
@@ -1346,85 +1357,77 @@ static void
 calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
 {
   MpegTSBase *base = (MpegTSBase *) demux;
-  GstEvent *newsegmentevent;
-  gint64 start = 0, stop = GST_CLOCK_TIME_NONE, position = 0;
-  GstClockTime firstpts = GST_CLOCK_TIME_NONE;
+  GstClockTime lowest_pts = GST_CLOCK_TIME_NONE;
+  GstClockTime firstts = GST_CLOCK_TIME_NONE;
   GList *tmp;
 
   GST_DEBUG ("Creating new newsegment for stream %p", stream);
 
-  /* Outgoing newsegment values
-   * start    : The first/start PTS
-   * stop     : The last PTS (or -1)
-   * position : The stream time corresponding to start
-   *
-   * Except for live mode with incoming GST_TIME_FORMAT newsegment where
-   * it is the same values as that incoming newsegment (and we convert the
-   * PTS to that remote clock).
-   */
+  /* 1) If we need to calculate an update newsegment, do it
+   * 2) If we need to calculate a new newsegment, do it
+   * 3) If an update_segment is valid, push it
+   * 4) If a newsegment is valid, push it */
 
+  /* Speedup : if we don't need to calculate anything, go straight to pushing */
+  if (!demux->calculate_update_segment && demux->segment_event)
+    goto push_new_segment;
+
+  /* Calculate the 'new_start' value, used for both updates and newsegment */
   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
     TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
 
-    if (!GST_CLOCK_TIME_IS_VALID (firstpts) || pstream->pts < firstpts)
-      firstpts = pstream->pts;
-  }
-
-  if (base->mode == BASE_MODE_PUSHING) {
-    /* FIXME : We're just ignore the upstream format for the time being */
-    /* FIXME : We should use base->segment.format and a upstream latency query
-     * to decide if we need to use live values or not */
-    GST_DEBUG ("push-based. base Segment start:%" GST_TIME_FORMAT " duration:%"
-        GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
-        GST_TIME_ARGS (base->segment.start),
-        GST_TIME_ARGS (base->segment.duration),
-        GST_TIME_ARGS (base->segment.stop), GST_TIME_ARGS (base->segment.time));
-    GST_DEBUG ("push-based. demux Segment start:%" GST_TIME_FORMAT " duration:%"
-        GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
-        GST_TIME_ARGS (demux->segment.start),
-        GST_TIME_ARGS (demux->segment.duration),
-        GST_TIME_ARGS (demux->segment.stop),
-        GST_TIME_ARGS (demux->segment.time));
-
-    GST_DEBUG ("stream pts: %" GST_TIME_FORMAT " first pts: %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (firstpts));
-
-    if (base->segment.format == GST_FORMAT_TIME) {
-      start = base->segment.start;
-      stop = base->segment.stop;
+    if (GST_CLOCK_TIME_IS_VALID (pstream->pts)) {
+      if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->pts < lowest_pts)
+        lowest_pts = pstream->pts;
     }
-    if (!base->upstream_live) {
-      /* Shift the start depending on our position in the stream */
-      start += firstpts + base->in_gap - base->first_buf_ts;
+    if (GST_CLOCK_TIME_IS_VALID (pstream->dts)) {
+      if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->dts < lowest_pts)
+        lowest_pts = pstream->dts;
     }
-    position = start;
-  } else {
-    /* pull mode */
-    GST_DEBUG ("pull-based. Segment start:%" GST_TIME_FORMAT " duration:%"
-        GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
-        GST_TIME_ARGS (demux->segment.start),
-        GST_TIME_ARGS (demux->segment.duration),
-        GST_TIME_ARGS (demux->segment.time));
-
-    /* FIXME : This is not entirely correct. We should be using the PTS time
-     * realm and not the PCR one. Doesn't matter *too* much if PTS/PCR values
-     * aren't too far apart, but still.  */
-    /* FIXME : EDWARD : Removed previous first pcr gsttime */
-    start = demux->segment.start;
-    stop = demux->segment.duration;
-    position = demux->segment.time;
+  }
+  if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
+    firstts = mpegts_packetizer_pts_to_ts (base->packetizer, lowest_pts);
+  GST_DEBUG ("lowest_pts %" G_GUINT64_FORMAT " => clocktime %" GST_TIME_FORMAT,
+      lowest_pts, GST_TIME_ARGS (firstts));
+
+  if (demux->calculate_update_segment) {
+    GST_DEBUG ("Calculating update segment");
+    /* If we have a valid segment, create an update of that */
+    if (demux->segment.format == GST_FORMAT_TIME) {
+      GST_DEBUG ("Re-using segment " SEGMENT_FORMAT,
+          SEGMENT_ARGS (demux->segment));
+      demux->update_segment =
+          gst_event_new_new_segment_full (TRUE, demux->segment.rate,
+          demux->segment.applied_rate, GST_FORMAT_TIME, demux->segment.start,
+          firstts, demux->segment.time);
+      GST_EVENT_SRC (demux->update_segment) = gst_object_ref (demux);
+    }
+    demux->calculate_update_segment = FALSE;
   }
 
-  GST_DEBUG ("new segment:   start: %" GST_TIME_FORMAT " stop: %"
-      GST_TIME_FORMAT " time: %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
-      GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
-  newsegmentevent =
-      gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, start, stop,
-      position);
+  if (!demux->segment_event) {
+    GST_DEBUG ("Calculating actual segment");
+    /* FIXME : Set proper values */
+    demux->segment_event =
+        gst_event_new_new_segment_full (FALSE, 1.0, 1.0, GST_FORMAT_TIME,
+        firstts, GST_CLOCK_TIME_NONE, firstts);
+    GST_EVENT_SRC (demux->segment_event) = gst_object_ref (demux);
+  }
 
-  push_event ((MpegTSBase *) demux, newsegmentevent);
+push_new_segment:
+  if (demux->update_segment) {
+    GST_DEBUG_OBJECT (stream->pad, "Pushing update segment");
+    gst_event_ref (demux->update_segment);
+    gst_pad_push_event (stream->pad, demux->update_segment);
+  }
 
-  demux->need_newsegment = FALSE;
+  if (demux->segment_event) {
+    GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
+    gst_event_ref (demux->segment_event);
+    gst_pad_push_event (stream->pad, demux->segment_event);
+  }
+
+  stream->need_newsegment = FALSE;
 }
 
 static GstFlowReturn
@@ -1461,7 +1464,7 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
     goto beach;
   }
 
-  if (G_UNLIKELY (demux->need_newsegment))
+  if (G_UNLIKELY (stream->need_newsegment))
     calculate_and_push_newsegment (demux, stream);
 
   /* We have a confirmed buffer, let's push it out */
@@ -1475,12 +1478,9 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
   GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
       GST_TIME_ARGS (stream->pts));
   if (GST_CLOCK_TIME_IS_VALID (stream->pts)
-      && !GST_CLOCK_TIME_IS_VALID (GST_BUFFER_TIMESTAMP (firstbuffer))
-      && GST_CLOCK_TIME_IS_VALID (packetizer->base_time)) {
+      && !GST_CLOCK_TIME_IS_VALID (GST_BUFFER_TIMESTAMP (firstbuffer)))
     GST_BUFFER_TIMESTAMP (firstbuffer) =
-        stream->pts - packetizer->base_pcrtime + packetizer->base_time +
-        packetizer->skew;
-  }
+        mpegts_packetizer_pts_to_ts (packetizer, stream->pts);
 
   GST_DEBUG_OBJECT (stream->pad,
       "Pushing buffer list with timestamp: %" GST_TIME_FORMAT,
@@ -1493,7 +1493,7 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
 
 beach:
   /* Reset everything */
-  GST_LOG ("Resetting to EMPTY");
+  GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
   stream->state = PENDING_PACKET_EMPTY;
   memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
   stream->nbpending = 0;
@@ -1536,7 +1536,7 @@ gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
           GST_BUFFER_OFFSET (packet->buffer));
   }
 
-  if (packet->payload)
+  if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED))
     gst_ts_demux_queue_data (demux, stream, packet);
   else
     gst_buffer_unref (packet->buffer);
@@ -1549,7 +1549,6 @@ gst_ts_demux_flush (MpegTSBase * base)
 {
   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
 
-  demux->need_newsegment = TRUE;
   gst_ts_demux_flush_streams (demux);
 }
 
index 4dff579..d23e9d2 100644 (file)
@@ -61,11 +61,18 @@ struct _GstTSDemux
   /*< private >*/
   MpegTSBaseProgram *program;  /* Current program */
   guint        current_program_number;
-  gboolean need_newsegment;
 
-  /* Downstream segment */
+  /* segments to be sent */
   GstSegment segment;
-  GstClockTime duration;       /* Total duration */
+  GstEvent *segment_event;
+
+  /* Set when program change */
+  gboolean calculate_update_segment;
+  /* update segment is */
+  GstEvent *update_segment;
+
+  /* Full stream duration */
+  GstClockTime duration;
 };
 
 struct _GstTSDemuxClass