mpegtsdemux: push based seeking based on PCR
authorJanne Grunau <janne.grunau@collabora.co.uk>
Tue, 22 Feb 2011 11:33:56 +0000 (12:33 +0100)
committerEdward Hervey <bilboed@bilboed.com>
Tue, 7 Jun 2011 18:50:34 +0000 (20:50 +0200)
buffer timestamps are converted to GstClockTime to cover pcr/pts wraps.
multiple pcr/pts wraps are handled with an index which ensures at most
a single pcr wraparound between two entries.
the last seen pcr is recorded to have a nearby index point for short seeks
resuming playback might be delayed if the postion is not a keyframe

TODO: replace manual packet scanning and parsing in the initial duration estimation

gst/mpegtsdemux/mpegtsbase.c
gst/mpegtsdemux/mpegtsbase.h
gst/mpegtsdemux/mpegtspacketizer.c
gst/mpegtsdemux/mpegtspacketizer.h
gst/mpegtsdemux/tsdemux.c
gst/mpegtsdemux/tsdemux.h

index f64b588..c7d2587 100644 (file)
@@ -187,6 +187,8 @@ mpegts_base_class_init (MpegTSBaseClass * klass)
 static void
 mpegts_base_reset (MpegTSBase * base)
 {
+  MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
+
   mpegts_packetizer_clear (base->packetizer);
   memset (base->is_pes, 0, 8192);
   memset (base->known_psi, 0, 8192);
@@ -206,6 +208,8 @@ mpegts_base_reset (MpegTSBase * base)
   /* base->pat = NULL; */
   /* pmt pids will be added and removed dynamically */
 
+  if (klass->reset)
+    klass->reset (base);
 }
 
 static void
@@ -1013,8 +1017,8 @@ mpegts_base_sink_event (GstPad * pad, GstEvent * event)
       gst_event_unref (event);
       res = FALSE;
       break;
-    case GST_EVENT_FLUSH_STOP:
-      mpegts_packetizer_clear (base->packetizer);
+    case GST_EVENT_FLUSH_START:
+      mpegts_packetizer_flush (base->packetizer);
       /* Passthrough */
     default:
       res = GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base, event);
@@ -1181,6 +1185,9 @@ mpegts_base_loop (MpegTSBase * base)
         goto error;
     }
       break;
+    case BASE_MODE_PUSHING:
+      GST_WARNING ("wrong BASE_MODE_PUSHING mode in pull loop");
+      break;
   }
 
   return;
@@ -1201,6 +1208,92 @@ error:
   }
 }
 
+
+gboolean
+mpegts_base_handle_seek_event (MpegTSBase * base, GstPad * pad,
+    GstEvent * event)
+{
+  MpegTSBaseClass *klass = GST_MPEGTS_BASE_GET_CLASS (base);
+  GstFlowReturn ret = GST_FLOW_ERROR;
+  gdouble rate;
+  gboolean flush;
+  GstFormat format;
+  GstSeekFlags flags;
+  GstSeekType start_type, stop_type;
+  gint64 start, stop;
+
+  gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
+      &stop_type, &stop);
+
+  if (format != GST_FORMAT_TIME)
+    return FALSE;
+
+  GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
+      " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
+      GST_TIME_ARGS (stop));
+
+  flush = flags & GST_SEEK_FLAG_FLUSH;
+
+  if (base->mode == BASE_MODE_PUSHING) {
+    GST_ERROR ("seeking in push mode not supported");
+    goto done;
+  }
+
+  /* stop streaming, either by flushing or by pausing the task */
+  base->mode = BASE_MODE_SEEKING;
+  if (flush) {
+    GST_DEBUG_OBJECT (base, "sending flush start");
+    gst_pad_push_event (base->sinkpad, gst_event_new_flush_start ());
+    GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base,
+        gst_event_new_flush_start ());
+  } else
+    gst_pad_pause_task (base->sinkpad);
+  /* wait for streaming to finish */
+  GST_PAD_STREAM_LOCK (base->sinkpad);
+
+  if (flush) {
+    /* send a FLUSH_STOP for the sinkpad, since we need data for seeking */
+    GST_DEBUG_OBJECT (base, "sending flush stop");
+    gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ());
+  }
+
+  if (flags & (GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SEGMENT |
+          GST_SEEK_FLAG_SKIP)) {
+    GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
+    goto done;
+  }
+
+
+  if (format == GST_FORMAT_TIME) {
+    /* If the subclass can seek, do that */
+    if (klass->seek) {
+      ret = klass->seek (base, event);
+      if (G_UNLIKELY (ret != GST_FLOW_OK)) {
+        GST_WARNING ("seeking failed %s", gst_flow_get_name (ret));
+        goto done;
+      }
+    } else {
+      GST_WARNING ("subclass has no seek implementation");
+      goto done;
+    }
+  }
+
+  if (flush) {
+    /* if we sent a FLUSH_START, we now send a FLUSH_STOP */
+    GST_DEBUG_OBJECT (base, "sending flush stop");
+    //gst_pad_push_event (base->sinkpad, gst_event_new_flush_stop ());
+    GST_MPEGTS_BASE_GET_CLASS (base)->push_event (base,
+        gst_event_new_flush_stop ());
+  }
+  //else
+done:
+  gst_pad_start_task (base->sinkpad, (GstTaskFunction) mpegts_base_loop, base);
+
+  GST_PAD_STREAM_UNLOCK (base->sinkpad);
+  return ret == GST_FLOW_OK;
+}
+
+
 static gboolean
 mpegts_base_sink_activate (GstPad * pad)
 {
@@ -1227,6 +1320,8 @@ mpegts_base_sink_activate_pull (GstPad * pad, gboolean active)
 static gboolean
 mpegts_base_sink_activate_push (GstPad * pad, gboolean active)
 {
+  MpegTSBase *base = GST_MPEGTS_BASE (GST_OBJECT_PARENT (pad));
+  base->mode = BASE_MODE_PUSHING;
   return TRUE;
 }
 
@@ -1243,6 +1338,8 @@ mpegts_base_change_state (GstElement * element, GstStateChange transition)
   switch (transition) {
     case GST_STATE_CHANGE_PAUSED_TO_READY:
       mpegts_base_reset (base);
+      if (base->mode != BASE_MODE_PUSHING)
+        base->mode = BASE_MODE_SCANNING;
       break;
     default:
       break;
index 168f2ff..01de54e 100644 (file)
@@ -74,7 +74,8 @@ struct _MpegTSBaseProgram
 typedef enum {
   BASE_MODE_SCANNING,
   BASE_MODE_SEEKING,
-  BASE_MODE_STREAMING
+  BASE_MODE_STREAMING,
+  BASE_MODE_PUSHING
 } MpegTSBaseMode;
 
 struct _MpegTSBase {
@@ -124,6 +125,7 @@ struct _MpegTSBaseClass {
   GstElementClass parent_class;
 
   /* Virtual methods */
+  void (*reset) (MpegTSBase *base);
   GstFlowReturn (*push) (MpegTSBase *base, MpegTSPacketizerPacket *packet, MpegTSPacketizerSection * section);
   gboolean (*push_event) (MpegTSBase *base, GstEvent * event);
   /* program_started gets called when program's pmt arrives for first time */
@@ -139,6 +141,9 @@ struct _MpegTSBaseClass {
   /* find_timestamps is called to find PCR */
  GstFlowReturn (*find_timestamps) (MpegTSBase * base, guint64 initoff, guint64 *offset);
 
+  /* seek is called to wait for seeking */
+  GstFlowReturn (*seek) (MpegTSBase * base, GstEvent * event);
+
   /* signals */
   void (*pat_info) (GstStructure *pat);
   void (*pmt_info) (GstStructure *pmt);
@@ -155,6 +160,8 @@ MpegTSBaseProgram *mpegts_base_add_program (MpegTSBase * base, gint program_numb
 guint8 *mpegts_get_descriptor_from_stream (MpegTSBaseStream * stream, guint8 tag);
 guint8 *mpegts_get_descriptor_from_program (MpegTSBaseProgram * program, guint8 tag);
 
+gboolean
+mpegts_base_handle_seek_event(MpegTSBase * base, GstPad * pad, GstEvent * event);
 
 gboolean gst_mpegtsbase_plugin_init (GstPlugin * plugin);
 
index 6739c0a..4616906 100644 (file)
@@ -2087,6 +2087,24 @@ mpegts_packetizer_clear (MpegTSPacketizer2 * packetizer)
 }
 
 void
+mpegts_packetizer_flush (MpegTSPacketizer2 * packetizer)
+{
+  if (packetizer->streams) {
+    int i;
+    for (i = 0; i < 8192; i++) {
+      if (packetizer->streams[i]) {
+        gst_adapter_flush (packetizer->streams[i]->section_adapter,
+            packetizer->streams[i]->section_adapter->size);
+      }
+    }
+  }
+  gst_adapter_flush (packetizer->adapter, packetizer->adapter->size);
+
+  packetizer->offset = 0;
+  packetizer->empty = TRUE;
+}
+
+void
 mpegts_packetizer_remove_stream (MpegTSPacketizer2 * packetizer, gint16 pid)
 {
   MpegTSPacketizerStream *stream = packetizer->streams[pid];
index f40189b..8328625 100644 (file)
@@ -138,6 +138,7 @@ GType mpegts_packetizer_get_type(void);
 
 MpegTSPacketizer2 *mpegts_packetizer_new (void);
 void mpegts_packetizer_clear (MpegTSPacketizer2 *packetizer);
+void mpegts_packetizer_flush (MpegTSPacketizer2 *packetizer);
 void mpegts_packetizer_push (MpegTSPacketizer2 *packetizer, GstBuffer *buffer);
 gboolean mpegts_packetizer_has_packets (MpegTSPacketizer2 *packetizer);
 MpegTSPacketizerPacketReturn mpegts_packetizer_next_packet (MpegTSPacketizer2 *packetizer,
index fe7fb02..3c18663 100644 (file)
@@ -30,6 +30,8 @@
 #include <stdlib.h>
 #include <string.h>
 
+#include <glib.h>
+
 #include "mpegtsbase.h"
 #include "tsdemux.h"
 #include "gstmpegdesc.h"
 /* Size of the pendingbuffers array. */
 #define TS_MAX_PENDING_BUFFERS 256
 
+#define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
+/* small PCR for wrap detection */
+#define PCR_SMALL 17775000
+/* maximal PCR time */
+#define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298)
+
 GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
 #define GST_CAT_DEFAULT ts_demux_debug
 
@@ -173,6 +181,7 @@ static void
 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
 static void
 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
+static void gst_ts_demux_reset (MpegTSBase * base);
 static GstFlowReturn
 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
     MpegTSPacketizerSection * section);
@@ -181,6 +190,10 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream,
     MpegTSBaseProgram * program);
 static void
 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream);
+static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event);
+static GstFlowReturn
+find_pcr_packet (MpegTSBase * base, guint64 offset, gint64 length,
+    TSPcrOffset * pcroffset);
 static GstFlowReturn
 find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset);
 static void gst_ts_demux_set_property (GObject * object, guint prop_id,
@@ -189,8 +202,9 @@ static void gst_ts_demux_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 static void gst_ts_demux_finalize (GObject * object);
 static GstFlowReturn
-process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
+process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset,
     guint numpcr, gboolean isinitial);
+static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux);
 static gboolean push_event (MpegTSBase * base, GstEvent * event);
 static void _extra_init (GType type);
 
@@ -254,6 +268,7 @@ gst_ts_demux_class_init (GstTSDemuxClass * klass)
 
 
   ts_class = GST_MPEGTS_BASE_CLASS (klass);
+  ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset);
   ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push);
   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
   ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started);
@@ -261,6 +276,7 @@ gst_ts_demux_class_init (GstTSDemuxClass * klass)
   ts_class->stream_added = gst_ts_demux_stream_added;
   ts_class->stream_removed = gst_ts_demux_stream_removed;
   ts_class->find_timestamps = GST_DEBUG_FUNCPTR (find_timestamps);
+  ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek);
 }
 
 static void
@@ -270,6 +286,32 @@ gst_ts_demux_init (GstTSDemux * demux, GstTSDemuxClass * klass)
   demux->program_number = -1;
   demux->duration = GST_CLOCK_TIME_NONE;
   GST_MPEGTS_BASE (demux)->stream_size = sizeof (TSDemuxStream);
+  gst_segment_init (&demux->segment, GST_FORMAT_TIME);
+  demux->first_pcr = (TSPcrOffset) {
+  GST_CLOCK_TIME_NONE, 0, 0};
+  demux->cur_pcr = (TSPcrOffset) {
+  0};
+  demux->last_pcr = (TSPcrOffset) {
+  0};
+}
+
+static void
+gst_ts_demux_reset (MpegTSBase * base)
+{
+  GstTSDemux *demux = (GstTSDemux *) base;
+  g_array_free (demux->index, TRUE);
+  demux->index = NULL;
+  demux->index_size = 0;
+  demux->need_newsegment = TRUE;
+  demux->program_number = -1;
+  demux->duration = GST_CLOCK_TIME_NONE;
+  gst_segment_init (&demux->segment, GST_FORMAT_TIME);
+  demux->first_pcr = (TSPcrOffset) {
+  GST_CLOCK_TIME_NONE, 0, 0};
+  demux->cur_pcr = (TSPcrOffset) {
+  0};
+  demux->last_pcr = (TSPcrOffset) {
+  0};
 }
 
 static void
@@ -324,6 +366,7 @@ gst_ts_demux_srcpad_query_types (GstPad * pad)
 {
   static const GstQueryType query_types[] = {
     GST_QUERY_DURATION,
+    GST_QUERY_SEEKING,
     0
   };
 
@@ -334,40 +377,327 @@ static gboolean
 gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query)
 {
   gboolean res = TRUE;
+  GstFormat format;
   GstTSDemux *demux;
 
   demux = GST_TS_DEMUX (gst_pad_get_parent (pad));
 
   switch (GST_QUERY_TYPE (query)) {
     case GST_QUERY_DURATION:
-    {
-      GstFormat format;
-
+      GST_DEBUG ("query duration");
       gst_query_parse_duration (query, &format, NULL);
-      /* can only get position in time */
-      if (format != GST_FORMAT_TIME)
-        goto wrong_format;
-
-      gst_query_set_duration (query, GST_FORMAT_TIME, demux->duration);
+      if (format == GST_FORMAT_TIME) {
+        gst_query_set_duration (query, GST_FORMAT_TIME,
+            demux->segment.duration);
+      } else {
+        GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
+        res = FALSE;
+      }
+      break;
+    case GST_QUERY_SEEKING:
+      GST_DEBUG ("query seeking");
+      gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
+      if (format == GST_FORMAT_TIME) {
+        gst_query_set_seeking (query, GST_FORMAT_TIME,
+            demux->parent.mode != BASE_MODE_PUSHING, 0,
+            demux->segment.duration);
+      } else {
+        GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking");
+        res = FALSE;
+      }
       break;
-    }
     default:
       res = gst_pad_query_default (pad, query);
-      break;
   }
 
-done:
   gst_object_unref (demux);
   return res;
 
-wrong_format:
-  {
-    GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
-    res = FALSE;
+}
+
+static inline GstClockTime
+calculate_gsttime (TSPcrOffset * start, guint64 pcr)
+{
+
+  GstClockTime time = start->gsttime;
+
+  if (start->pcr > pcr)
+    time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE - start->pcr) +
+        PCRTIME_TO_GSTTIME (pcr);
+  else
+    time += PCRTIME_TO_GSTTIME (pcr - start->pcr);
+
+  return time;
+}
+
+
+static gint
+TSPcrOffset_find (gconstpointer a, gconstpointer b, gpointer user_data)
+{
+
+/*   GST_INFO ("a: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
+/*       GST_TIME_ARGS (((TSPcrOffset *) a)->gsttime), ((TSPcrOffset *) a)->offset); */
+/*   GST_INFO ("b: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
+/*       GST_TIME_ARGS (((TSPcrOffset *) b)->gsttime), ((TSPcrOffset *) b)->offset); */
+
+  if (((TSPcrOffset *) a)->gsttime < ((TSPcrOffset *) b)->gsttime)
+    return -1;
+  else if (((TSPcrOffset *) a)->gsttime > ((TSPcrOffset *) b)->gsttime)
+    return 1;
+  else
+    return 0;
+}
+
+static GstFlowReturn
+gst_ts_demux_perform_seek (MpegTSBase * base, GstSegment * segment)
+{
+  GstTSDemux *demux = (GstTSDemux *) base;
+  GstFlowReturn res = GST_FLOW_ERROR;
+  int loop_cnt = 0;
+  double bias = 1.0;
+  gint64 desired_offset;
+  gint64 seekpos = 0;
+  gint64 time_diff;
+  GstClockTime seektime;
+  TSPcrOffset seekpcroffset, pcr_start, pcr_stop, *tmp;
+
+  desired_offset = segment->last_stop;
+
+  seektime = desired_offset + demux->first_pcr.gsttime;
+  seekpcroffset.gsttime = seektime;
+
+  GST_DEBUG ("seeking to %" GST_TIME_FORMAT, GST_TIME_ARGS (seektime));
+
+  gst_ts_demux_flush_streams (demux);
+
+  if (G_UNLIKELY (!demux->index)) {
+    GST_ERROR ("no index");
+    goto done;
+  }
+
+  /* get the first index entry before the seek position */
+  tmp = gst_util_array_binary_search (demux->index->data, demux->index_size,
+      sizeof (*tmp), TSPcrOffset_find, GST_SEARCH_MODE_BEFORE, &seekpcroffset,
+      NULL);
+
+  if (G_UNLIKELY (!tmp)) {
+    GST_ERROR ("value not found");
+    goto done;
+  }
+
+  pcr_start = *tmp;
+  pcr_stop = *(++tmp);
+
+  if (G_UNLIKELY (!pcr_stop.offset)) {
+    GST_ERROR ("invalid entry");
     goto done;
   }
+
+  /* check if the last recorded pcr can be used */
+  if (pcr_start.offset < demux->cur_pcr.offset
+      && demux->cur_pcr.offset < pcr_stop.offset) {
+    demux->cur_pcr.gsttime = calculate_gsttime (&pcr_start, demux->cur_pcr.pcr);
+    if (demux->cur_pcr.gsttime < seekpcroffset.gsttime)
+      pcr_start = demux->cur_pcr;
+    else
+      pcr_stop = demux->cur_pcr;
+  }
+
+  GST_DEBUG ("start %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
+      GST_TIME_ARGS (pcr_start.gsttime), pcr_start.offset);
+  GST_DEBUG ("stop  %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
+      GST_TIME_ARGS (pcr_stop.gsttime), pcr_stop.offset);
+
+  time_diff = seektime - pcr_start.gsttime;
+  seekpcroffset = pcr_start;
+
+  GST_DEBUG ("cur  %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT
+      " time diff: %" G_GINT64_FORMAT,
+      GST_TIME_ARGS (demux->cur_pcr.gsttime), demux->cur_pcr.offset, time_diff);
+
+  /* seek loop */
+  while (loop_cnt++ < 10 && (time_diff < 0 || time_diff > 333 * GST_MSECOND)) {
+    gint64 duration = pcr_stop.gsttime - pcr_start.gsttime;
+    gint64 size = pcr_stop.offset - pcr_start.offset;
+
+    seekpos =
+        pcr_start.offset + size * bias * ((double) (seektime -
+            pcr_start.gsttime) / duration);
+
+    /* look a litle bit behind */
+    seekpos =
+        MAX (pcr_start.offset + 188, seekpos - 55 * MPEGTS_MAX_PACKETSIZE);
+
+    GST_DEBUG ("looking for time: %" GST_TIME_FORMAT " .. %" GST_TIME_FORMAT
+        " .. %" GST_TIME_FORMAT "   bias = %g",
+        GST_TIME_ARGS (pcr_start.gsttime),
+        GST_TIME_ARGS (seektime), GST_TIME_ARGS (pcr_stop.gsttime), bias);
+    GST_DEBUG ("looking in bytes: %" G_GINT64_FORMAT " .. %" G_GINT64_FORMAT
+        " .. %" G_GINT64_FORMAT, pcr_start.offset, seekpos, pcr_stop.offset,
+        bias);
+
+    res =
+        find_pcr_packet (&demux->parent, seekpos, 4000 * MPEGTS_MAX_PACKETSIZE,
+        &seekpcroffset);
+    if (G_UNLIKELY (res == GST_FLOW_UNEXPECTED)) {
+      seekpos =
+          MAX ((gint64) pcr_start.offset,
+          seekpos - 2000 * MPEGTS_MAX_PACKETSIZE) + 188;
+      res =
+          find_pcr_packet (&demux->parent, seekpos,
+          8000 * MPEGTS_MAX_PACKETSIZE, &seekpcroffset);
+    }
+    if (G_UNLIKELY (res != GST_FLOW_OK)) {
+      GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
+      goto done;
+    }
+
+    seekpcroffset.gsttime = calculate_gsttime (&pcr_start, seekpcroffset.pcr);
+
+    bias =
+        1.0 + MAX (-.3, MIN (.3,
+            ((double) seektime - seekpcroffset.gsttime) / duration));
+
+    /* validate */
+    if (G_UNLIKELY ((seekpcroffset.gsttime < pcr_start.gsttime) ||
+            (seekpcroffset.gsttime > pcr_stop.gsttime))) {
+      GST_ERROR ("Unexpected timestamp found, seeking failed! %"
+          GST_TIME_FORMAT, GST_TIME_ARGS (seekpcroffset.gsttime));
+      res = GST_FLOW_ERROR;
+      goto done;
+    }
+
+    if (seekpcroffset.gsttime > seektime) {
+      pcr_stop = seekpcroffset;
+    } else {
+      pcr_start = seekpcroffset;
+    }
+    time_diff = seektime - pcr_start.gsttime;
+    GST_DEBUG ("looking: %" GST_TIME_FORMAT " found: %" GST_TIME_FORMAT
+        " diff = %" G_GINT64_FORMAT, GST_TIME_ARGS (seektime),
+        GST_TIME_ARGS (seekpcroffset.gsttime), time_diff);
+  }
+
+  GST_DEBUG ("seeking finished after %d loops", loop_cnt);
+
+
+  segment->last_stop = seekpcroffset.gsttime;
+  segment->time = seekpcroffset.gsttime;
+
+  /* we stop at the end */
+  if (segment->stop == -1)
+    segment->stop = segment->duration;
+
+  demux->need_newsegment = TRUE;
+  demux->parent.seek_offset = seekpcroffset.offset;
+  GST_DEBUG ("seeked to postion:%" GST_TIME_FORMAT,
+      GST_TIME_ARGS (seekpcroffset.gsttime));
+  res = GST_FLOW_OK;
+
+done:
+  return res;
+}
+
+
+static GstFlowReturn
+gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
+{
+  GstTSDemux *demux = (GstTSDemux *) base;
+  GstFlowReturn res = GST_FLOW_ERROR;
+  gdouble rate;
+  gboolean accurate, flush;
+  GstFormat format;
+  GstSeekFlags flags;
+  GstSeekType start_type, stop_type;
+  gint64 start, stop;
+  GstSegment seeksegment;
+  gboolean update;
+
+  gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
+      &stop_type, &stop);
+
+  if (format != GST_FORMAT_TIME) {
+    goto done;
+  }
+
+  GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
+      " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
+      GST_TIME_ARGS (stop));
+
+  accurate = flags & GST_SEEK_FLAG_ACCURATE;
+  flush = flags & GST_SEEK_FLAG_FLUSH;
+
+  if (flags & (GST_SEEK_FLAG_KEY_UNIT | GST_SEEK_FLAG_SEGMENT |
+          GST_SEEK_FLAG_SKIP)) {
+    GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
+    goto done;
+  }
+
+  /* copy segment, we need this because we still need the old
+   * segment when we close the current segment. */
+  memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
+  /* configure the segment with the seek variables */
+  GST_DEBUG_OBJECT (demux, "configuring seek");
+  GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
+      GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
+      " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
+      GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
+      GST_TIME_ARGS (seeksegment.last_stop),
+      GST_TIME_ARGS (seeksegment.duration));
+  gst_segment_set_seek (&seeksegment, rate, format, flags, start_type, start,
+      stop_type, stop, &update);
+  GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
+      GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
+      " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
+      GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
+      GST_TIME_ARGS (seeksegment.last_stop),
+      GST_TIME_ARGS (seeksegment.duration));
+
+  res = gst_ts_demux_perform_seek (base, &seeksegment);
+  if (G_UNLIKELY (res != GST_FLOW_OK)) {
+    GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
+    goto done;
+  }
+
+  /* commit the new segment */
+  memcpy (&demux->segment, &seeksegment, sizeof (GstSegment));
+
+  if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
+    gst_element_post_message (GST_ELEMENT_CAST (demux),
+        gst_message_new_segment_start (GST_OBJECT_CAST (demux),
+            demux->segment.format, demux->segment.last_stop));
+  }
+
+done:
+  return res;
 }
 
+static gboolean
+gst_ts_demux_srcpad_event (GstPad * pad, GstEvent * event)
+{
+  gboolean res = TRUE;
+  GstTSDemux *demux = GST_TS_DEMUX (gst_pad_get_parent (pad));
+
+  GST_DEBUG_OBJECT (pad, "Got event %s",
+      gst_event_type_get_name (GST_EVENT_TYPE (event)));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_SEEK:
+      res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event);
+      if (!res) {
+        GST_WARNING ("seeking failed");
+      }
+      gst_event_unref (event);
+      break;
+    default:
+      res = gst_pad_event_default (pad, event);
+  }
+
+  gst_object_unref (demux);
+  return res;
+}
 
 static gboolean
 push_event (MpegTSBase * base, GstEvent * event)
@@ -675,6 +1005,7 @@ create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
     gst_pad_set_caps (pad, caps);
     gst_pad_set_query_type_function (pad, gst_ts_demux_srcpad_query_types);
     gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query);
+    gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event);
     gst_caps_unref (caps);
   }
 
@@ -726,6 +1057,33 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
 }
 
 static void
+gst_ts_demux_stream_flush (TSDemuxStream * stream)
+{
+  gint i;
+
+  stream->pts = GST_CLOCK_TIME_NONE;
+
+  for (i = 0; i < stream->nbpending; i++)
+    gst_buffer_unref (stream->pendingbuffers[i]);
+  memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
+  stream->nbpending = 0;
+
+  stream->current = NULL;
+}
+
+static void
+gst_ts_demux_flush_streams (GstTSDemux * demux)
+{
+  gint i;
+
+  for (i = 0; i < 0x2000; i++) {
+    if (demux->program->streams[i]) {
+      gst_ts_demux_stream_flush ((TSDemuxStream *) demux->program->streams[i]);
+    }
+  }
+}
+
+static void
 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
 {
   GstTSDemux *demux = GST_TS_DEMUX (base);
@@ -832,6 +1190,171 @@ process_section (MpegTSBase * base)
   return done;
 }
 
+static gboolean
+process_pes (MpegTSBase * base, TSPcrOffset * pcroffset)
+{
+  gboolean based, done = FALSE;
+  MpegTSPacketizerPacket packet;
+  MpegTSPacketizerPacketReturn pret;
+  GstTSDemux *demux = GST_TS_DEMUX (base);
+  guint16 pcr_pid = 0;
+
+  while ((!done)
+      && ((pret =
+              mpegts_packetizer_next_packet (base->packetizer,
+                  &packet)) != PACKET_NEED_MORE)) {
+    if (G_UNLIKELY (pret == PACKET_BAD))
+      /* bad header, skip the packet */
+      goto next;
+
+    if (demux->program != NULL) {
+      pcr_pid = demux->program->pcr_pid;
+    }
+
+    /* base PSI data */
+    if (packet.payload != NULL && mpegts_base_is_psi (base, &packet)) {
+      MpegTSPacketizerSection section;
+
+      based =
+          mpegts_packetizer_push_section (base->packetizer, &packet, &section);
+      if (G_UNLIKELY (!based))
+        /* bad section data */
+        goto next;
+
+      if (G_LIKELY (section.complete)) {
+        /* section complete */
+        GST_DEBUG ("Section Complete");
+        based = mpegts_base_handle_psi (base, &section);
+        gst_buffer_unref (section.buffer);
+        if (G_UNLIKELY (!based))
+          /* bad PSI table */
+          goto next;
+
+      }
+    }
+    if (packet.pid == pcr_pid && (packet.adaptation_field_control & 0x02)
+        && (packet.afc_flags & MPEGTS_AFC_PCR_FLAG)) {
+      GST_DEBUG ("PCR[0x%x]: %" G_GINT64_FORMAT, packet.pid, packet.pcr);
+      pcroffset->pcr = packet.pcr;
+      pcroffset->offset = packet.offset;
+      done = TRUE;
+    }
+  next:
+    mpegts_packetizer_clear_packet (base->packetizer, &packet);
+  }
+  return done;
+}
+
+static GstFlowReturn
+find_pcr_packet (MpegTSBase * base, guint64 offset, gint64 length,
+    TSPcrOffset * pcroffset)
+{
+  GstFlowReturn ret = GST_FLOW_OK;
+  GstTSDemux *demux = GST_TS_DEMUX (base);
+  MpegTSBaseProgram *program;
+  GstBuffer *buf;
+  gboolean done = FALSE;
+  guint64 scan_offset = 0;
+
+  GST_DEBUG ("Scanning for PCR between:%" G_GINT64_FORMAT
+      " and the end:%" G_GINT64_FORMAT, offset, offset + length);
+
+  /* Get the program */
+  program = demux->program;
+  if (G_UNLIKELY (program == NULL))
+    return GST_FLOW_ERROR;
+
+  mpegts_packetizer_flush (base->packetizer);
+
+  while (!done && scan_offset < length) {
+    ret =
+        gst_pad_pull_range (base->sinkpad, offset + scan_offset,
+        50 * MPEGTS_MAX_PACKETSIZE, &buf);
+    if (ret != GST_FLOW_OK)
+      goto beach;
+    mpegts_packetizer_push (base->packetizer, buf);
+    done = process_pes (base, pcroffset);
+    scan_offset += 50 * MPEGTS_MAX_PACKETSIZE;
+  }
+
+  if (!done || scan_offset >= length) {
+    GST_WARNING ("No PCR found!");
+    ret = GST_FLOW_ERROR;
+    goto beach;
+  }
+
+beach:
+  mpegts_packetizer_flush (base->packetizer);
+  return ret;
+}
+
+static gboolean
+verify_timestamps (MpegTSBase * base, TSPcrOffset * first, TSPcrOffset * last)
+{
+  GstTSDemux *demux = GST_TS_DEMUX (base);
+  guint64 length = 4000 * MPEGTS_MAX_PACKETSIZE;
+  guint64 offset = PCR_WRAP_SIZE_128KBPS;
+
+  demux->index =
+      g_array_sized_new (TRUE, TRUE, sizeof (*first),
+      2 + 1 + ((last->offset - first->offset) / PCR_WRAP_SIZE_128KBPS));
+
+  first->gsttime = PCRTIME_TO_GSTTIME (first->pcr);
+  demux->index = g_array_append_val (demux->index, *first);
+  demux->index_size++;
+  demux->first_pcr = *first;
+  demux->index_pcr = *first;
+  GST_DEBUG ("first time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
+      " offset: %" G_GINT64_FORMAT
+      " last  pcr: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
+      GST_TIME_ARGS (first->gsttime),
+      GST_TIME_ARGS (PCRTIME_TO_GSTTIME (first->pcr)), first->offset,
+      GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset);
+
+  while (offset + length < last->offset) {
+    TSPcrOffset half;
+    GstFlowReturn ret;
+    gint tries = 0;
+
+  retry:
+    ret = find_pcr_packet (base, offset, length, &half);
+    if (G_UNLIKELY (ret != GST_FLOW_OK)) {
+      GST_WARNING ("no pcr found, retrying");
+      if (tries++ < 3) {
+        offset += length;
+        length *= 2;
+        goto retry;
+      }
+      return FALSE;
+    }
+
+    half.gsttime = calculate_gsttime (first, half.pcr);
+
+    GST_DEBUG ("add half time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
+        " offset: %" G_GINT64_FORMAT,
+        GST_TIME_ARGS (half.gsttime),
+        GST_TIME_ARGS (PCRTIME_TO_GSTTIME (half.pcr)), half.offset);
+    demux->index = g_array_append_val (demux->index, half);
+    demux->index_size++;
+
+    length = 4000 * MPEGTS_MAX_PACKETSIZE;
+    offset += PCR_WRAP_SIZE_128KBPS;
+    *first = half;
+  }
+
+  last->gsttime = calculate_gsttime (first, last->pcr);
+
+  GST_DEBUG ("add last time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
+      " offset: %" G_GINT64_FORMAT,
+      GST_TIME_ARGS (last->gsttime),
+      GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset);
+
+  demux->index = g_array_append_val (demux->index, *last);
+  demux->index_size++;
+
+  demux->last_pcr = *last;
+  return TRUE;
+}
 
 static GstFlowReturn
 find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
@@ -844,7 +1367,7 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
   gint64 total_bytes;
   guint64 scan_offset;
   guint i = 0;
-  GstClockTime initial, final;
+  TSPcrOffset initial, final;
   GstTSDemux *demux = GST_TS_DEMUX (base);
 
   GST_DEBUG ("Scanning for timestamps");
@@ -875,6 +1398,8 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
   mpegts_packetizer_clear (base->packetizer);
   /* Remove current program so we ensure looking for a PAT when scanning the 
    * for the final PCR */
+  gst_structure_free (base->pat);
+  base->pat = NULL;
   mpegts_base_remove_program (base, demux->current_program_number);
 
   if (ret != GST_FLOW_OK && ret != GST_FLOW_UNEXPECTED) {
@@ -922,8 +1447,11 @@ find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
     goto beach;
   }
 
-  demux->duration = final - initial;
+  verify_timestamps (base, &initial, &final);
 
+  gst_segment_set_duration (&demux->segment, GST_FORMAT_TIME,
+      demux->last_pcr.gsttime - demux->first_pcr.gsttime);
+  demux->duration = demux->last_pcr.gsttime - demux->first_pcr.gsttime;
   GST_DEBUG ("Done, duration:%" GST_TIME_FORMAT,
       GST_TIME_ARGS (demux->duration));
 
@@ -931,13 +1459,15 @@ beach:
 
   mpegts_packetizer_clear (base->packetizer);
   /* Remove current program */
+  gst_structure_free (base->pat);
+  base->pat = NULL;
   mpegts_base_remove_program (base, demux->current_program_number);
 
   return ret;
 }
 
 static GstFlowReturn
-process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
+process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset,
     guint numpcr, gboolean isinitial)
 {
   GstTSDemux *demux = GST_TS_DEMUX (base);
@@ -988,15 +1518,31 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
     offset = 0;
     size = GST_BUFFER_SIZE (buf);
 
-    /* FIXME : We should jump to next packet instead of scanning everything */
-    while ((size >= br.size) && (nbpcr < numpcr)
-        && (offset =
-            gst_byte_reader_masked_scan_uint32 (&br, pcrmask, pcrpattern,
-                offset, size)) != -1) {
+  resync:
+    offset = gst_byte_reader_masked_scan_uint32 (&br, 0xff000000, 0x47000000,
+        0, base->packetsize);
+
+    if (offset == -1)
+      continue;
+
+    while ((nbpcr < numpcr) && (size >= base->packetsize)) {
+
+      guint32 header = GST_READ_UINT32_BE (br.data + offset);
+
+      if ((header >> 24) != 0x47)
+        goto resync;
+
+      if ((header & pcrmask) != pcrpattern) {
+        /* Move offset forward by 1 packet */
+        size -= base->packetsize;
+        offset += base->packetsize;
+        continue;
+      }
+
       /* Potential PCR */
 /*      GST_DEBUG ("offset %" G_GUINT64_FORMAT, GST_BUFFER_OFFSET (buf) + offset);
       GST_MEMDUMP ("something", GST_BUFFER_DATA (buf) + offset, 16);*/
-      if ((*(br.data + offset + 5)) & 0x10) {
+      if ((*(br.data + offset + 5)) & MPEGTS_AFC_PCR_FLAG) {
         guint64 lpcr = mpegts_packetizer_compute_pcr (br.data + offset + 6);
 
         GST_INFO ("Found PCR %" G_GUINT64_FORMAT " %" GST_TIME_FORMAT
@@ -1011,6 +1557,9 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
         if (nbpcr > 1) {
           if (pcrs[nbpcr] == pcrs[nbpcr - 1]) {
             GST_WARNING ("Found same PCR at different offset");
+          } else if (pcrs[nbpcr] < pcrs[nbpcr - 1]) {
+            GST_WARNING ("Found PCR wraparound");
+            nbpcr += 1;
           } else if ((pcrs[nbpcr] - pcrs[nbpcr - 1]) >
               (guint64) 10 * 60 * 27000000) {
             GST_WARNING ("PCR differs with previous PCR by more than 10 mins");
@@ -1019,20 +1568,22 @@ process_pcr (MpegTSBase * base, guint64 initoff, GstClockTime * pcr,
         } else
           nbpcr += 1;
       }
-      /* Move offset forward by 1 */
-      size -= offset + 1;
-      offset += 1;
-
+      /* Move offset forward by 1 packet */
+      size -= base->packetsize;
+      offset += base->packetsize;
     }
   }
 
 beach:
   GST_DEBUG ("Found %d PCR", nbpcr);
   if (nbpcr) {
-    if (isinitial)
-      *pcr = PCRTIME_TO_GSTTIME (pcrs[0]);
-    else
-      *pcr = PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1]);
+    if (isinitial) {
+      pcroffset->pcr = pcrs[0];
+      pcroffset->offset = pcroffs[0];
+    } else {
+      pcroffset->pcr = pcrs[nbpcr - 1];
+      pcroffset->offset = pcroffs[nbpcr - 1];
+    }
     GST_DEBUG ("pcrdiff:%" GST_TIME_FORMAT " offsetdiff %" G_GUINT64_FORMAT,
         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1] - pcrs[0])),
         pcroffs[nbpcr - 1] - pcroffs[0]);
@@ -1061,6 +1612,18 @@ gst_ts_demux_record_pcr (GstTSDemux * demux, TSDemuxStream * stream,
       G_GUINT64_FORMAT, bs->pid,
       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr)), offset);
 
+  if (G_LIKELY (bs->pid == demux->program->pcr_pid)) {
+    demux->cur_pcr.gsttime = GST_CLOCK_TIME_NONE;
+    demux->cur_pcr.offset = offset;
+    demux->cur_pcr.pcr = pcr;
+    /* set first_pcr in push mode */
+    if (G_UNLIKELY (!demux->first_pcr.gsttime == GST_CLOCK_TIME_NONE)) {
+      demux->first_pcr.gsttime = PCRTIME_TO_GSTTIME (pcr);
+      demux->first_pcr.offset = offset;
+      demux->first_pcr.pcr = pcr;
+    }
+  }
+
   if (G_UNLIKELY (demux->emit_statistics)) {
     GstStructure *st;
     st = gst_structure_id_empty_new (QUARK_TSDEMUX);
@@ -1139,6 +1702,36 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
   }
 }
 
+static inline GstClockTime
+calc_gsttime_from_pts (TSPcrOffset * start, guint64 pts)
+{
+  GstClockTime time = start->gsttime - PCRTIME_TO_GSTTIME (start->pcr);
+
+  if (start->pcr > pts * 300)
+    time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE) + MPEGTIME_TO_GSTTIME (pts);
+  else
+    time += MPEGTIME_TO_GSTTIME (pts);
+
+  return time;
+}
+
+static gint
+TSPcrOffset_find_offset (gconstpointer a, gconstpointer b, gpointer user_data)
+{
+
+/*   GST_INFO ("a: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
+/*       GST_TIME_ARGS (((TSPcrOffset *) a)->gsttime), ((TSPcrOffset *) a)->offset); */
+/*   GST_INFO ("b: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
+/*       GST_TIME_ARGS (((TSPcrOffset *) b)->gsttime), ((TSPcrOffset *) b)->offset); */
+
+  if (((TSPcrOffset *) a)->offset < ((TSPcrOffset *) b)->offset)
+    return -1;
+  else if (((TSPcrOffset *) a)->offset > ((TSPcrOffset *) b)->offset)
+    return 1;
+  else
+    return 0;
+}
+
 static GstFlowReturn
 gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
 {
@@ -1214,12 +1807,32 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
 
     /*  PTS                             32 */
     if ((p2 & 0x80)) {          /* PTS */
+      GstClockTime time;
+      guint64 offset = GST_BUFFER_OFFSET (stream->pendingbuffers[0]);
+
       READ_TS (data, pts, discont);
-      gst_ts_demux_record_pts (demux, stream, pts,
-          GST_BUFFER_OFFSET (stream->pendingbuffers[0]));
+      gst_ts_demux_record_pts (demux, stream, pts, offset);
       length -= 4;
-      GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) =
-          MPEGTIME_TO_GSTTIME (pts);
+
+      if (demux->index_pcr.offset + PCR_WRAP_SIZE_128KBPS + 1000 * 128 < offset
+          || (demux->index_pcr.offset > offset)) {
+        /* find next entry */
+        TSPcrOffset *next;
+        demux->index_pcr.offset = offset;
+        next = gst_util_array_binary_search (demux->index->data,
+            demux->index_size, sizeof (*next), TSPcrOffset_find_offset,
+            GST_SEARCH_MODE_BEFORE, &demux->index_pcr, NULL);
+        if (next) {
+          GST_INFO ("new index_pcr %" GST_TIME_FORMAT " offset: %"
+              G_GINT64_FORMAT, GST_TIME_ARGS (next->gsttime), next->offset);
+
+          demux->index_pcr = *next;
+        }
+      }
+
+      time = calc_gsttime_from_pts (&demux->index_pcr, pts);
+
+      GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) = time;
 
       if (!GST_CLOCK_TIME_IS_VALID (stream->pts)) {
         stream->pts = GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]);
@@ -1344,7 +1957,6 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
 
   guint i;
   GstClockTime tinypts = GST_CLOCK_TIME_NONE;
-  GstClockTime stop = GST_CLOCK_TIME_NONE;
   GstEvent *newsegmentevent;
 
   GST_DEBUG ("stream:%p, pid:0x%04x stream_type:%d state:%d pad:%s:%s",
@@ -1381,24 +1993,27 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
                     tinypts))
               tinypts = ((TSDemuxStream *) demux->program->streams[i])->pts;
           }
-
-
         }
 
-        if (GST_CLOCK_TIME_IS_VALID (demux->duration))
-          stop = tinypts + demux->duration;
-
-        GST_DEBUG ("Sending newsegment event");
+        GST_DEBUG ("segment: tinypts: %" GST_TIME_FORMAT " stop: %"
+            GST_TIME_FORMAT " time: %" GST_TIME_FORMAT,
+            GST_TIME_ARGS (tinypts),
+            GST_TIME_ARGS (demux->first_pcr.gsttime + demux->duration),
+            GST_TIME_ARGS (tinypts - demux->first_pcr.gsttime));
         newsegmentevent =
-            gst_event_new_new_segment (0, 1.0, GST_FORMAT_TIME, tinypts, stop,
-            0);
+            gst_event_new_new_segment (0, 1.0, GST_FORMAT_TIME, tinypts,
+            demux->first_pcr.gsttime + demux->duration,
+            tinypts - demux->first_pcr.gsttime);
 
         push_event ((MpegTSBase *) demux, newsegmentevent);
 
         demux->need_newsegment = FALSE;
       }
 
-      GST_DEBUG_OBJECT (stream->pad, "Pushing buffer list ");
+      GST_DEBUG_OBJECT (stream->pad,
+          "Pushing buffer list with timestamp: %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (gst_buffer_list_get
+                  (stream->current, 0, 0))));
 
       res = gst_pad_push_list (stream->pad, stream->current);
       GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
index bfee2df..636bc3a 100644 (file)
@@ -48,6 +48,14 @@ G_BEGIN_DECLS
 #define GST_TS_DEMUX_CAST(obj) ((GstTSDemux*) obj)
 typedef struct _GstTSDemux GstTSDemux;
 typedef struct _GstTSDemuxClass GstTSDemuxClass;
+typedef struct _TSPcrOffset TSPcrOffset;
+
+struct _TSPcrOffset
+{
+  guint64 gsttime;
+  guint64 pcr;
+  guint64 offset;
+};
 
 struct _GstTSDemux
 {
@@ -62,7 +70,16 @@ struct _GstTSDemux
   MpegTSBaseProgram *program;  /* Current program */
   guint        current_program_number;
   gboolean need_newsegment;
+  GstSegment segment;
   GstClockTime duration;       /* Total duration */
+
+  /* pcr wrap and seeking */
+  GArray *index;
+  gint index_size;
+  TSPcrOffset first_pcr;
+  TSPcrOffset last_pcr;
+  TSPcrOffset cur_pcr;
+  TSPcrOffset index_pcr;
 };
 
 struct _GstTSDemuxClass