Tizen 2.0 Release
[framework/multimedia/gst-plugins-bad0.10.git] / gst / mpegtsdemux / tsdemux.c
1 /*
2  * tsdemux.c
3  * Copyright (C) 2009 Zaheer Abbas Merali
4  *               2010 Edward Hervey
5  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
6  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
7  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
8  *  Author: Edward Hervey <bilboed@bilboed.com>, Collabora Ltd.
9  *
10  * Authors:
11  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
12  *   Edward Hervey <edward.hervey@collabora.co.uk>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
27  * Boston, MA 02111-1307, USA.
28  */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #include <stdlib.h>
35 #include <string.h>
36
37 #include <glib.h>
38
39 #include "mpegtsbase.h"
40 #include "tsdemux.h"
41 #include "gstmpegdesc.h"
42 #include "gstmpegdefs.h"
43 #include "mpegtspacketizer.h"
44 #include "payload_parsers.h"
45 #include "pesparse.h"
46
47 /* 
48  * tsdemux
49  *
50  * See TODO for explanations on improvements needed
51  */
52
53 /* latency in mseconds */
54 #define TS_LATENCY 700
55
56 #define TABLE_ID_UNSET 0xFF
57
58 /* Size of the pendingbuffers array. */
59 #define TS_MAX_PENDING_BUFFERS  256
60
61 #define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
62 /* small PCR for wrap detection */
63 #define PCR_SMALL 17775000
64 /* maximal PCR time */
65 #define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298)
66 #define PTS_DTS_MAX_VALUE (((guint64)1) << 33)
67
68 /* seek to SEEK_TIMESTAMP_OFFSET before the desired offset and search then
69  * either accurately or for the next timestamp
70  */
71 #define SEEK_TIMESTAMP_OFFSET (1000 * GST_MSECOND)
72
73 GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
74 #define GST_CAT_DEFAULT ts_demux_debug
75
76 #define ABSDIFF(a,b) (((a) > (b)) ? ((a) - (b)) : ((b) - (a)))
77
78 static GQuark QUARK_TSDEMUX;
79 static GQuark QUARK_PID;
80 static GQuark QUARK_PCR;
81 static GQuark QUARK_OPCR;
82 static GQuark QUARK_PTS;
83 static GQuark QUARK_DTS;
84 static GQuark QUARK_OFFSET;
85
86 typedef enum
87 {
88   PENDING_PACKET_EMPTY = 0,     /* No pending packet/buffer
89                                  * Push incoming buffers to the array */
90   PENDING_PACKET_HEADER,        /* PES header needs to be parsed
91                                  * Push incoming buffers to the array */
92   PENDING_PACKET_BUFFER,        /* Currently filling up output buffer
93                                  * Push incoming buffers to the bufferlist */
94   PENDING_PACKET_DISCONT        /* Discontinuity in incoming packets
95                                  * Drop all incoming buffers */
96 } PendingPacketState;
97
98 typedef struct _TSDemuxStream TSDemuxStream;
99
100 struct _TSDemuxStream
101 {
102   MpegTSBaseStream stream;
103
104   GstPad *pad;
105
106   /* the return of the latest push */
107   GstFlowReturn flow_return;
108
109   /* Output data */
110   PendingPacketState state;
111   /* Pending buffers array. */
112   /* These buffers are stored in this array until the PES header (if needed)
113    * is succesfully parsed. */
114   GstBuffer *pendingbuffers[TS_MAX_PENDING_BUFFERS];
115   guint8 nbpending;
116
117   /* Current data to be pushed out */
118   GstBufferList *current;
119   GstBufferListIterator *currentit;
120   GList *currentlist;
121
122   /* Current PTS/DTS for this stream */
123   GstClockTime pts;
124   GstClockTime dts;
125   /* Raw value of current PTS/DTS */
126   guint64 raw_pts;
127   guint64 raw_dts;
128   /* Number of rollover seen for PTS/DTS (default:0) */
129   guint nb_pts_rollover;
130   guint nb_dts_rollover;
131 };
132
133 #define VIDEO_CAPS \
134   GST_STATIC_CAPS (\
135     "video/mpeg, " \
136       "mpegversion = (int) { 1, 2, 4 }, " \
137       "systemstream = (boolean) FALSE; " \
138     "video/x-h264,stream-format=(string)byte-stream," \
139       "alignment=(string)nal;" \
140     "video/x-dirac;" \
141     "video/x-wmv," \
142       "wmvversion = (int) 3, " \
143       "format = (fourcc) WVC1" \
144   )
145
146 #define AUDIO_CAPS \
147   GST_STATIC_CAPS ( \
148     "audio/mpeg, " \
149       "mpegversion = (int) 1;" \
150     "audio/mpeg, " \
151       "mpegversion = (int) 4, " \
152       "stream-format = (string) adts; " \
153     "audio/x-lpcm, " \
154       "width = (int) { 16, 20, 24 }, " \
155       "rate = (int) { 48000, 96000 }, " \
156       "channels = (int) [ 1, 8 ], " \
157       "dynamic_range = (int) [ 0, 255 ], " \
158       "emphasis = (boolean) { FALSE, TRUE }, " \
159       "mute = (boolean) { FALSE, TRUE }; " \
160     "audio/x-ac3; audio/x-eac3;" \
161     "audio/x-dts;" \
162     "audio/x-private-ts-lpcm" \
163   )
164
165 /* Can also use the subpicture pads for text subtitles? */
166 #define SUBPICTURE_CAPS \
167     GST_STATIC_CAPS ("subpicture/x-pgs; video/x-dvd-subpicture")
168
169 static GstStaticPadTemplate video_template =
170 GST_STATIC_PAD_TEMPLATE ("video_%04x", GST_PAD_SRC,
171     GST_PAD_SOMETIMES,
172     VIDEO_CAPS);
173
174 static GstStaticPadTemplate audio_template =
175 GST_STATIC_PAD_TEMPLATE ("audio_%04x",
176     GST_PAD_SRC,
177     GST_PAD_SOMETIMES,
178     AUDIO_CAPS);
179
180 static GstStaticPadTemplate subpicture_template =
181 GST_STATIC_PAD_TEMPLATE ("subpicture_%04x",
182     GST_PAD_SRC,
183     GST_PAD_SOMETIMES,
184     SUBPICTURE_CAPS);
185
186 static GstStaticPadTemplate private_template =
187 GST_STATIC_PAD_TEMPLATE ("private_%04x",
188     GST_PAD_SRC,
189     GST_PAD_SOMETIMES,
190     GST_STATIC_CAPS_ANY);
191
192 enum
193 {
194   ARG_0,
195   PROP_PROGRAM_NUMBER,
196   PROP_EMIT_STATS,
197   /* FILL ME */
198 };
199
200 /* Pad functions */
201 static const GstQueryType *gst_ts_demux_srcpad_query_types (GstPad * pad);
202 static gboolean gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query);
203
204
205 /* mpegtsbase methods */
206 static void
207 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
208 static void gst_ts_demux_reset (MpegTSBase * base);
209 static GstFlowReturn
210 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
211     MpegTSPacketizerSection * section);
212 static void gst_ts_demux_flush (MpegTSBase * base);
213 static void
214 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream,
215     MpegTSBaseProgram * program);
216 static void
217 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream);
218 static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event,
219     guint16 pid);
220 static GstFlowReturn find_pcr_packet (MpegTSBase * base, guint64 offset,
221     gint64 length, TSPcrOffset * pcroffset);
222 static GstFlowReturn find_timestamps (MpegTSBase * base, guint64 initoff,
223     guint64 * offset);
224 static void gst_ts_demux_set_property (GObject * object, guint prop_id,
225     const GValue * value, GParamSpec * pspec);
226 static void gst_ts_demux_get_property (GObject * object, guint prop_id,
227     GValue * value, GParamSpec * pspec);
228 static void gst_ts_demux_finalize (GObject * object);
229 static GstFlowReturn
230 process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset,
231     guint numpcr, gboolean isinitial);
232 static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux);
233 static GstFlowReturn
234 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream);
235
236 static gboolean push_event (MpegTSBase * base, GstEvent * event);
237 static void _extra_init (GType type);
238
239 GST_BOILERPLATE_FULL (GstTSDemux, gst_ts_demux, MpegTSBase,
240     GST_TYPE_MPEGTS_BASE, _extra_init);
241
242 static void
243 _extra_init (GType type)
244 {
245   QUARK_TSDEMUX = g_quark_from_string ("tsdemux");
246   QUARK_PID = g_quark_from_string ("pid");
247   QUARK_PCR = g_quark_from_string ("pcr");
248   QUARK_OPCR = g_quark_from_string ("opcr");
249   QUARK_PTS = g_quark_from_string ("pts");
250   QUARK_DTS = g_quark_from_string ("dts");
251   QUARK_OFFSET = g_quark_from_string ("offset");
252 }
253
254 static void
255 gst_ts_demux_base_init (gpointer klass)
256 {
257   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
258
259   gst_element_class_add_static_pad_template (element_class,
260       &video_template);
261   gst_element_class_add_static_pad_template (element_class,
262       &audio_template);
263   gst_element_class_add_static_pad_template (element_class,
264       &subpicture_template);
265   gst_element_class_add_static_pad_template (element_class,
266       &private_template);
267
268   gst_element_class_set_details_simple (element_class,
269       "MPEG transport stream demuxer",
270       "Codec/Demuxer",
271       "Demuxes MPEG2 transport streams",
272       "Zaheer Abbas Merali <zaheerabbas at merali dot org>\n"
273       "Edward Hervey <edward.hervey@collabora.co.uk>");
274 }
275
276 static void
277 gst_ts_demux_class_init (GstTSDemuxClass * klass)
278 {
279   GObjectClass *gobject_class;
280   MpegTSBaseClass *ts_class;
281
282   gobject_class = G_OBJECT_CLASS (klass);
283   gobject_class->set_property = gst_ts_demux_set_property;
284   gobject_class->get_property = gst_ts_demux_get_property;
285   gobject_class->finalize = gst_ts_demux_finalize;
286
287   g_object_class_install_property (gobject_class, PROP_PROGRAM_NUMBER,
288       g_param_spec_int ("program-number", "Program number",
289           "Program Number to demux for (-1 to ignore)", -1, G_MAXINT,
290           -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
291
292   g_object_class_install_property (gobject_class, PROP_EMIT_STATS,
293       g_param_spec_boolean ("emit-stats", "Emit statistics",
294           "Emit messages for every pcr/opcr/pts/dts", FALSE,
295           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296
297
298   ts_class = GST_MPEGTS_BASE_CLASS (klass);
299   ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset);
300   ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push);
301   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
302   ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started);
303   ts_class->stream_added = gst_ts_demux_stream_added;
304   ts_class->stream_removed = gst_ts_demux_stream_removed;
305   ts_class->find_timestamps = GST_DEBUG_FUNCPTR (find_timestamps);
306   ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek);
307   ts_class->flush = GST_DEBUG_FUNCPTR (gst_ts_demux_flush);
308 }
309
310 static void
311 gst_ts_demux_init (GstTSDemux * demux, GstTSDemuxClass * klass)
312 {
313   demux->need_newsegment = TRUE;
314   demux->program_number = -1;
315   demux->duration = GST_CLOCK_TIME_NONE;
316   GST_MPEGTS_BASE (demux)->stream_size = sizeof (TSDemuxStream);
317   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
318   demux->first_pcr = (TSPcrOffset) {
319   GST_CLOCK_TIME_NONE, 0, 0};
320   demux->cur_pcr = (TSPcrOffset) {
321   0};
322   demux->last_pcr = (TSPcrOffset) {
323   0};
324 }
325
326 static void
327 gst_ts_demux_reset (MpegTSBase * base)
328 {
329   GstTSDemux *demux = (GstTSDemux *) base;
330
331   if (demux->index) {
332     g_array_free (demux->index, TRUE);
333     demux->index = NULL;
334   }
335   demux->index_size = 0;
336   demux->need_newsegment = TRUE;
337   demux->program_number = -1;
338   demux->duration = GST_CLOCK_TIME_NONE;
339   gst_segment_init (&demux->segment, GST_FORMAT_TIME);
340   demux->first_pcr = (TSPcrOffset) {
341   GST_CLOCK_TIME_NONE, 0, 0};
342   demux->cur_pcr = (TSPcrOffset) {
343   0};
344   demux->last_pcr = (TSPcrOffset) {
345   0};
346 }
347
348 static void
349 gst_ts_demux_finalize (GObject * object)
350 {
351   if (G_OBJECT_CLASS (parent_class)->finalize)
352     G_OBJECT_CLASS (parent_class)->finalize (object);
353 }
354
355
356
357 static void
358 gst_ts_demux_set_property (GObject * object, guint prop_id,
359     const GValue * value, GParamSpec * pspec)
360 {
361   GstTSDemux *demux = GST_TS_DEMUX (object);
362
363   switch (prop_id) {
364     case PROP_PROGRAM_NUMBER:
365       /* FIXME: do something if program is switched as opposed to set at
366        * beginning */
367       demux->program_number = g_value_get_int (value);
368       break;
369     case PROP_EMIT_STATS:
370       demux->emit_statistics = g_value_get_boolean (value);
371       break;
372     default:
373       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
374   }
375 }
376
377 static void
378 gst_ts_demux_get_property (GObject * object, guint prop_id,
379     GValue * value, GParamSpec * pspec)
380 {
381   GstTSDemux *demux = GST_TS_DEMUX (object);
382
383   switch (prop_id) {
384     case PROP_PROGRAM_NUMBER:
385       g_value_set_int (value, demux->program_number);
386       break;
387     case PROP_EMIT_STATS:
388       g_value_set_boolean (value, demux->emit_statistics);
389       break;
390     default:
391       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
392   }
393 }
394
395 static const GstQueryType *
396 gst_ts_demux_srcpad_query_types (GstPad * pad)
397 {
398   static const GstQueryType query_types[] = {
399     GST_QUERY_DURATION,
400     GST_QUERY_SEEKING,
401     0
402   };
403
404   return query_types;
405 }
406
407 static gboolean
408 gst_ts_demux_srcpad_query (GstPad * pad, GstQuery * query)
409 {
410   gboolean res = TRUE;
411   GstFormat format;
412   GstTSDemux *demux;
413   MpegTSBase *base;
414
415   demux = GST_TS_DEMUX (gst_pad_get_parent (pad));
416   base = GST_MPEGTS_BASE (demux);
417
418   switch (GST_QUERY_TYPE (query)) {
419     case GST_QUERY_DURATION:
420       GST_DEBUG ("query duration");
421       gst_query_parse_duration (query, &format, NULL);
422       if (format == GST_FORMAT_TIME) {
423         if (!gst_pad_peer_query (base->sinkpad, query))
424           gst_query_set_duration (query, GST_FORMAT_TIME,
425               demux->segment.duration);
426       } else {
427         GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
428         res = FALSE;
429       }
430       break;
431     case GST_QUERY_SEEKING:
432       GST_DEBUG ("query seeking");
433       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
434       if (format == GST_FORMAT_TIME) {
435         gboolean seekable = FALSE;
436
437         if (gst_pad_peer_query (base->sinkpad, query))
438           gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
439
440         /* If upstream is not seekable in TIME format we use
441          * our own values here */
442         if (!seekable)
443           gst_query_set_seeking (query, GST_FORMAT_TIME,
444               demux->parent.mode != BASE_MODE_PUSHING, 0,
445               demux->segment.duration);
446       } else {
447         GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking");
448         res = FALSE;
449       }
450       break;
451     default:
452       res = gst_pad_query_default (pad, query);
453   }
454
455   gst_object_unref (demux);
456   return res;
457
458 }
459
460 static inline GstClockTime
461 calculate_gsttime (TSPcrOffset * start, guint64 pcr)
462 {
463
464   GstClockTime time = start->gsttime;
465
466   if (start->pcr > pcr)
467     time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE - start->pcr) +
468         PCRTIME_TO_GSTTIME (pcr);
469   else
470     time += PCRTIME_TO_GSTTIME (pcr - start->pcr);
471
472   return time;
473 }
474
475 static GstFlowReturn
476 gst_ts_demux_parse_pes_header_pts (GstTSDemux * demux,
477     MpegTSPacketizerPacket * packet, guint64 * time)
478 {
479   PESHeader header;
480   gint offset = 0;
481
482   if (mpegts_parse_pes_header (packet->payload,
483           packet->data_end - packet->payload, &header, &offset))
484     return GST_FLOW_ERROR;
485
486   *time = header.PTS;
487   return GST_FLOW_OK;
488 }
489
490 /* performs a accurate/key_unit seek */
491 static GstFlowReturn
492 gst_ts_demux_perform_auxiliary_seek (MpegTSBase * base, GstClockTime seektime,
493     TSPcrOffset * pcroffset, gint64 length, gint16 pid, GstSeekFlags flags,
494     payload_parse_keyframe auxiliary_seek_fn)
495 {
496   GstTSDemux *demux = (GstTSDemux *) base;
497   GstFlowReturn res = GST_FLOW_ERROR;
498   gboolean done = FALSE;
499   gboolean found_keyframe = FALSE, found_accurate = FALSE, need_more = TRUE;
500   GstBuffer *buf;
501   MpegTSPacketizerPacket packet;
502   MpegTSPacketizerPacketReturn pret;
503   gint64 offset = pcroffset->offset;
504   gint64 scan_offset = MIN (length, 50 * MPEGTS_MAX_PACKETSIZE);
505   guint32 state = 0xffffffff;
506   TSPcrOffset key_pos = { 0 };
507
508   GST_DEBUG ("auxiliary seek for %" GST_TIME_FORMAT " from offset: %"
509       G_GINT64_FORMAT " in %" G_GINT64_FORMAT " bytes for PID: %d "
510       "%s %s", GST_TIME_ARGS (seektime), pcroffset->offset, length, pid,
511       (flags & GST_SEEK_FLAG_ACCURATE) ? "accurate" : "",
512       (flags & GST_SEEK_FLAG_KEY_UNIT) ? "key_unit" : "");
513
514   mpegts_packetizer_flush (base->packetizer);
515
516   if (base->packetizer->packet_size == MPEGTS_M2TS_PACKETSIZE)
517     offset -= 4;
518
519   while (!done && scan_offset <= length) {
520     res =
521         gst_pad_pull_range (base->sinkpad, offset + scan_offset,
522         50 * MPEGTS_MAX_PACKETSIZE, &buf);
523     if (res != GST_FLOW_OK)
524       goto beach;
525     mpegts_packetizer_push (base->packetizer, buf);
526
527     while ((!done)
528         && ((pret =
529                 mpegts_packetizer_next_packet (base->packetizer,
530                     &packet)) != PACKET_NEED_MORE)) {
531       if (G_UNLIKELY (pret == PACKET_BAD))
532         /* bad header, skip the packet */
533         goto next;
534
535       if (packet.payload_unit_start_indicator)
536         GST_DEBUG ("found packet for PID: %d with pcr: %" GST_TIME_FORMAT
537             " at offset: %" G_GINT64_FORMAT, packet.pid,
538             GST_TIME_ARGS (packet.pcr), packet.offset);
539
540       if (packet.payload != NULL && packet.pid == pid) {
541
542         if (packet.payload_unit_start_indicator) {
543           guint64 pts = 0;
544           GstFlowReturn ok =
545               gst_ts_demux_parse_pes_header_pts (demux, &packet, &pts);
546           if (ok == GST_FLOW_OK) {
547             GstClockTime time = calculate_gsttime (pcroffset, pts * 300);
548
549             GST_DEBUG ("packet has PTS: %" GST_TIME_FORMAT,
550                 GST_TIME_ARGS (time));
551
552             if (time <= seektime) {
553               pcroffset->gsttime = time;
554               pcroffset->pcr = packet.pcr;
555               pcroffset->offset = packet.offset;
556             } else
557               found_accurate = TRUE;
558           } else
559             goto next;
560           /* reset state for new packet */
561           state = 0xffffffff;
562           need_more = TRUE;
563         }
564
565         if (auxiliary_seek_fn) {
566           if (need_more) {
567             if (auxiliary_seek_fn (&state, &packet, &need_more)) {
568               found_keyframe = TRUE;
569               key_pos = *pcroffset;
570               GST_DEBUG ("found keyframe: time: %" GST_TIME_FORMAT " pcr: %"
571                   GST_TIME_FORMAT " offset %" G_GINT64_FORMAT,
572                   GST_TIME_ARGS (pcroffset->gsttime),
573                   GST_TIME_ARGS (pcroffset->pcr), pcroffset->offset);
574             }
575           }
576         } else {
577           /* if we don't have a payload parsing function
578            * every frame is a keyframe */
579           found_keyframe = TRUE;
580         }
581       }
582       if (flags & GST_SEEK_FLAG_ACCURATE)
583         done = found_accurate && found_keyframe;
584       else
585         done = found_keyframe;
586       if (done)
587         *pcroffset = key_pos;
588     next:
589       mpegts_packetizer_clear_packet (base->packetizer, &packet);
590     }
591     scan_offset += 50 * MPEGTS_MAX_PACKETSIZE;
592   }
593
594 beach:
595   if (done)
596     res = GST_FLOW_OK;
597   else if (GST_FLOW_OK == res)
598     res = GST_FLOW_CUSTOM_ERROR_1;
599
600   mpegts_packetizer_flush (base->packetizer);
601   return res;
602 }
603
604 static gint
605 TSPcrOffset_find (gconstpointer a, gconstpointer b, gpointer user_data)
606 {
607
608 /*   GST_INFO ("a: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
609 /*       GST_TIME_ARGS (((TSPcrOffset *) a)->gsttime), ((TSPcrOffset *) a)->offset); */
610 /*   GST_INFO ("b: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT, */
611 /*       GST_TIME_ARGS (((TSPcrOffset *) b)->gsttime), ((TSPcrOffset *) b)->offset); */
612
613   if (((TSPcrOffset *) a)->gsttime < ((TSPcrOffset *) b)->gsttime)
614     return -1;
615   else if (((TSPcrOffset *) a)->gsttime > ((TSPcrOffset *) b)->gsttime)
616     return 1;
617   else
618     return 0;
619 }
620
621 static GstFlowReturn
622 gst_ts_demux_perform_seek (MpegTSBase * base, GstSegment * segment, guint16 pid)
623 {
624   GstTSDemux *demux = (GstTSDemux *) base;
625   GstFlowReturn res = GST_FLOW_ERROR;
626   int max_loop_cnt, loop_cnt = 0;
627   gint64 seekpos = 0;
628   gint64 time_diff;
629   GstClockTime seektime;
630   TSPcrOffset seekpcroffset, pcr_start, pcr_stop, *tmp;
631
632   max_loop_cnt = (segment->flags & GST_SEEK_FLAG_ACCURATE) ? 25 : 10;
633
634   seektime =
635       MAX (0,
636       segment->last_stop - SEEK_TIMESTAMP_OFFSET) + demux->first_pcr.gsttime;
637   seekpcroffset.gsttime = seektime;
638
639   GST_DEBUG ("seeking to %" GST_TIME_FORMAT, GST_TIME_ARGS (seektime));
640
641   gst_ts_demux_flush_streams (demux);
642
643   if (G_UNLIKELY (!demux->index)) {
644     GST_ERROR ("no index");
645     goto done;
646   }
647
648   /* get the first index entry before the seek position */
649   tmp = gst_util_array_binary_search (demux->index->data, demux->index_size,
650       sizeof (*tmp), TSPcrOffset_find, GST_SEARCH_MODE_BEFORE, &seekpcroffset,
651       NULL);
652
653   if (G_UNLIKELY (!tmp)) {
654     GST_ERROR ("value not found");
655     goto done;
656   }
657
658   pcr_start = *tmp;
659   pcr_stop = *(++tmp);
660
661   if (G_UNLIKELY (!pcr_stop.offset)) {
662     GST_ERROR ("invalid entry");
663     goto done;
664   }
665
666   /* check if the last recorded pcr can be used */
667   if (pcr_start.offset < demux->cur_pcr.offset
668       && demux->cur_pcr.offset < pcr_stop.offset) {
669     demux->cur_pcr.gsttime = calculate_gsttime (&pcr_start, demux->cur_pcr.pcr);
670     if (demux->cur_pcr.gsttime < seekpcroffset.gsttime)
671       pcr_start = demux->cur_pcr;
672     else
673       pcr_stop = demux->cur_pcr;
674   }
675
676   GST_DEBUG ("start %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
677       GST_TIME_ARGS (pcr_start.gsttime), pcr_start.offset);
678   GST_DEBUG ("stop  %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
679       GST_TIME_ARGS (pcr_stop.gsttime), pcr_stop.offset);
680
681   time_diff = seektime - pcr_start.gsttime;
682   seekpcroffset = pcr_start;
683
684   GST_DEBUG ("cur  %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT
685       " time diff: %" G_GINT64_FORMAT,
686       GST_TIME_ARGS (demux->cur_pcr.gsttime), demux->cur_pcr.offset, time_diff);
687
688   /* seek loop */
689   while (loop_cnt++ < max_loop_cnt && (time_diff > SEEK_TIMESTAMP_OFFSET >> 1)
690       && (pcr_stop.gsttime - pcr_start.gsttime > SEEK_TIMESTAMP_OFFSET)) {
691     gint64 duration = pcr_stop.gsttime - pcr_start.gsttime;
692     gint64 size = pcr_stop.offset - pcr_start.offset;
693
694     if (loop_cnt & 1)
695       seekpos = pcr_start.offset + (size >> 1);
696     else
697       seekpos =
698           pcr_start.offset + size * ((double) (seektime -
699               pcr_start.gsttime) / duration);
700
701     /* look a litle bit behind */
702     seekpos =
703         MAX (pcr_start.offset + 188, seekpos - 55 * MPEGTS_MAX_PACKETSIZE);
704
705     GST_DEBUG ("looking for time: %" GST_TIME_FORMAT " .. %" GST_TIME_FORMAT
706         " .. %" GST_TIME_FORMAT,
707         GST_TIME_ARGS (pcr_start.gsttime),
708         GST_TIME_ARGS (seektime), GST_TIME_ARGS (pcr_stop.gsttime));
709     GST_DEBUG ("looking in bytes: %" G_GINT64_FORMAT " .. %" G_GINT64_FORMAT
710         " .. %" G_GINT64_FORMAT, pcr_start.offset, seekpos, pcr_stop.offset);
711
712     res =
713         find_pcr_packet (&demux->parent, seekpos, 4000 * MPEGTS_MAX_PACKETSIZE,
714         &seekpcroffset);
715     if (G_UNLIKELY (res == GST_FLOW_UNEXPECTED)) {
716       seekpos =
717           MAX ((gint64) pcr_start.offset,
718           seekpos - 2000 * MPEGTS_MAX_PACKETSIZE) + 188;
719       res =
720           find_pcr_packet (&demux->parent, seekpos,
721           8000 * MPEGTS_MAX_PACKETSIZE, &seekpcroffset);
722     }
723     if (G_UNLIKELY (res != GST_FLOW_OK)) {
724       GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
725       goto done;
726     }
727
728     seekpcroffset.gsttime = calculate_gsttime (&pcr_start, seekpcroffset.pcr);
729
730     /* validate */
731     if (G_UNLIKELY ((seekpcroffset.gsttime < pcr_start.gsttime) ||
732             (seekpcroffset.gsttime > pcr_stop.gsttime))) {
733       GST_ERROR ("Unexpected timestamp found, seeking failed! %"
734           GST_TIME_FORMAT, GST_TIME_ARGS (seekpcroffset.gsttime));
735       res = GST_FLOW_ERROR;
736       goto done;
737     }
738
739     if (seekpcroffset.gsttime > seektime) {
740       pcr_stop = seekpcroffset;
741     } else {
742       pcr_start = seekpcroffset;
743     }
744     time_diff = seektime - pcr_start.gsttime;
745     GST_DEBUG ("seeking: %" GST_TIME_FORMAT " found: %" GST_TIME_FORMAT
746         " diff = %" G_GINT64_FORMAT, GST_TIME_ARGS (seektime),
747         GST_TIME_ARGS (seekpcroffset.gsttime), time_diff);
748   }
749
750   GST_DEBUG ("seeking finished after %d loops", loop_cnt);
751
752   /* use correct seek position for the auxiliary search */
753   seektime += SEEK_TIMESTAMP_OFFSET;
754
755   {
756     payload_parse_keyframe keyframe_seek = NULL;
757     MpegTSBaseProgram *program = demux->program;
758     guint64 avg_bitrate, length;
759
760     if (program->streams[pid]) {
761       switch (program->streams[pid]->stream_type) {
762         case ST_VIDEO_MPEG1:
763         case ST_VIDEO_MPEG2:
764           keyframe_seek = gst_tsdemux_has_mpeg2_keyframe;
765           break;
766         case ST_VIDEO_H264:
767           keyframe_seek = gst_tsdemux_has_h264_keyframe;
768           break;
769         case ST_VIDEO_MPEG4:
770         case ST_VIDEO_DIRAC:
771           GST_WARNING ("no payload parser for stream 0x%04x type: 0x%02x", pid,
772               program->streams[pid]->stream_type);
773           break;
774       }
775     } else
776       GST_WARNING ("no stream info for PID: 0x%04x", pid);
777
778     avg_bitrate =
779         (pcr_stop.offset -
780         pcr_start.offset) * 1000 * GST_MSECOND / (pcr_stop.gsttime -
781         pcr_start.gsttime);
782
783     seekpcroffset = pcr_start;
784     /* search in 2500ms for a keyframe */
785     length =
786         MIN (demux->last_pcr.offset - pcr_start.offset,
787         (avg_bitrate * 25) / 10);
788     res =
789         gst_ts_demux_perform_auxiliary_seek (base, seektime, &seekpcroffset,
790         length, pid, segment->flags, keyframe_seek);
791
792     if (res == GST_FLOW_CUSTOM_ERROR_1) {
793       GST_ERROR ("no keyframe found in %" G_GUINT64_FORMAT
794           " bytes starting from %" G_GUINT64_FORMAT, length,
795           seekpcroffset.offset);
796       res = GST_FLOW_ERROR;
797     }
798     if (res != GST_FLOW_OK)
799       goto done;
800   }
801
802
803   /* update seektime to the actual timestamp of the found keyframe */
804   if (segment->flags & GST_SEEK_FLAG_KEY_UNIT)
805     seektime = seekpcroffset.gsttime;
806
807   seektime -= demux->first_pcr.gsttime;
808
809   segment->last_stop = seektime;
810   segment->time = seektime;
811
812   /* we stop at the end */
813   if (segment->stop == -1)
814     segment->stop = demux->first_pcr.gsttime + segment->duration;
815
816   demux->need_newsegment = TRUE;
817   demux->parent.seek_offset = seekpcroffset.offset;
818   GST_DEBUG ("seeked to postion:%" GST_TIME_FORMAT, GST_TIME_ARGS (seektime));
819   res = GST_FLOW_OK;
820
821 done:
822   return res;
823 }
824
825
826 static GstFlowReturn
827 gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event, guint16 pid)
828 {
829   GstTSDemux *demux = (GstTSDemux *) base;
830   GstFlowReturn res = GST_FLOW_ERROR;
831   gdouble rate;
832   GstFormat format;
833   GstSeekFlags flags;
834   GstSeekType start_type, stop_type;
835   gint64 start, stop;
836   GstSegment seeksegment;
837   gboolean update;
838
839   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
840       &stop_type, &stop);
841
842   if (format != GST_FORMAT_TIME) {
843     goto done;
844   }
845
846   GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
847       " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
848       GST_TIME_ARGS (stop));
849
850   if (flags & (GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_SKIP)) {
851     GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
852     goto done;
853   }
854
855   /* copy segment, we need this because we still need the old
856    * segment when we close the current segment. */
857   memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
858   /* configure the segment with the seek variables */
859   GST_DEBUG_OBJECT (demux, "configuring seek");
860   GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
861       GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
862       " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
863       GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
864       GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
865       GST_TIME_ARGS (seeksegment.last_stop),
866       GST_TIME_ARGS (seeksegment.duration));
867   gst_segment_set_seek (&seeksegment, rate, format, flags, start_type, start,
868       stop_type, stop, &update);
869   GST_DEBUG ("seeksegment: start: %" GST_TIME_FORMAT " stop: %"
870       GST_TIME_FORMAT " time: %" GST_TIME_FORMAT " accum: %" GST_TIME_FORMAT
871       " last_stop: %" GST_TIME_FORMAT " duration: %" GST_TIME_FORMAT,
872       GST_TIME_ARGS (seeksegment.start), GST_TIME_ARGS (seeksegment.stop),
873       GST_TIME_ARGS (seeksegment.time), GST_TIME_ARGS (seeksegment.accum),
874       GST_TIME_ARGS (seeksegment.last_stop),
875       GST_TIME_ARGS (seeksegment.duration));
876
877   res = gst_ts_demux_perform_seek (base, &seeksegment, pid);
878   if (G_UNLIKELY (res != GST_FLOW_OK)) {
879     GST_WARNING ("seeking failed %s", gst_flow_get_name (res));
880     goto done;
881   }
882
883   /* commit the new segment */
884   memcpy (&demux->segment, &seeksegment, sizeof (GstSegment));
885
886   if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
887     gst_element_post_message (GST_ELEMENT_CAST (demux),
888         gst_message_new_segment_start (GST_OBJECT_CAST (demux),
889             demux->segment.format, demux->segment.last_stop));
890   }
891
892 done:
893   return res;
894 }
895
896 static gboolean
897 gst_ts_demux_srcpad_event (GstPad * pad, GstEvent * event)
898 {
899   gboolean res = TRUE;
900   GstTSDemux *demux = GST_TS_DEMUX (gst_pad_get_parent (pad));
901
902   GST_DEBUG_OBJECT (pad, "Got event %s",
903       gst_event_type_get_name (GST_EVENT_TYPE (event)));
904
905   switch (GST_EVENT_TYPE (event)) {
906     case GST_EVENT_SEEK:
907       res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event);
908       if (!res)
909         GST_WARNING ("seeking failed");
910       gst_event_unref (event);
911       break;
912     default:
913       res = gst_pad_event_default (pad, event);
914   }
915
916   gst_object_unref (demux);
917   return res;
918 }
919
920 static gboolean
921 push_event (MpegTSBase * base, GstEvent * event)
922 {
923   GstTSDemux *demux = (GstTSDemux *) base;
924   GList *tmp;
925
926   if (G_UNLIKELY (demux->program == NULL))
927     return FALSE;
928
929   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
930     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
931     if (stream->pad) {
932       gst_event_ref (event);
933       gst_pad_push_event (stream->pad, event);
934     }
935   }
936
937   return TRUE;
938 }
939
940 static GstFlowReturn
941 tsdemux_combine_flows (GstTSDemux * demux, TSDemuxStream * stream,
942     GstFlowReturn ret)
943 {
944   GList *tmp;
945
946   /* Store the value */
947   stream->flow_return = ret;
948
949   /* any other error that is not-linked can be returned right away */
950   if (ret != GST_FLOW_NOT_LINKED)
951     goto done;
952
953   /* Only return NOT_LINKED if all other pads returned NOT_LINKED */
954   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
955     stream = (TSDemuxStream *) tmp->data;
956     if (stream->pad) {
957       ret = stream->flow_return;
958       /* some other return value (must be SUCCESS but we can return
959        * other values as well) */
960       if (ret != GST_FLOW_NOT_LINKED)
961         goto done;
962     }
963     /* if we get here, all other pads were unlinked and we return
964      * NOT_LINKED then */
965   }
966
967 done:
968   return ret;
969 }
970
971 static GstPad *
972 create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
973     MpegTSBaseProgram * program)
974 {
975   TSDemuxStream *stream = (TSDemuxStream *) bstream;
976   gchar *name = NULL;
977   GstCaps *caps = NULL;
978   GstPadTemplate *template = NULL;
979   guint8 *desc = NULL;
980   GstPad *pad = NULL;
981
982
983   GST_LOG ("Attempting to create pad for stream 0x%04x with stream_type %d",
984       bstream->pid, bstream->stream_type);
985
986   switch (bstream->stream_type) {
987     case ST_VIDEO_MPEG1:
988     case ST_VIDEO_MPEG2:
989       GST_LOG ("mpeg video");
990       template = gst_static_pad_template_get (&video_template);
991       name = g_strdup_printf ("video_%04x", bstream->pid);
992       caps = gst_caps_new_simple ("video/mpeg",
993           "mpegversion", G_TYPE_INT,
994           bstream->stream_type == ST_VIDEO_MPEG1 ? 1 : 2, "systemstream",
995           G_TYPE_BOOLEAN, FALSE, NULL);
996
997       break;
998     case ST_AUDIO_MPEG1:
999     case ST_AUDIO_MPEG2:
1000       GST_LOG ("mpeg audio");
1001       template = gst_static_pad_template_get (&audio_template);
1002       name = g_strdup_printf ("audio_%04x", bstream->pid);
1003       caps =
1004           gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 1,
1005           NULL);
1006       break;
1007     case ST_PRIVATE_DATA:
1008       GST_LOG ("private data");
1009       desc =
1010           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1011           DESC_DVB_AC3);
1012       if (desc) {
1013         GST_LOG ("ac3 audio");
1014         template = gst_static_pad_template_get (&audio_template);
1015         name = g_strdup_printf ("audio_%04x", bstream->pid);
1016         caps = gst_caps_new_simple ("audio/x-ac3", NULL);
1017         g_free (desc);
1018         break;
1019       }
1020       desc =
1021           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1022           DESC_DVB_ENHANCED_AC3);
1023       if (desc) {
1024         GST_LOG ("ac3 audio");
1025         template = gst_static_pad_template_get (&audio_template);
1026         name = g_strdup_printf ("audio_%04x", bstream->pid);
1027         caps = gst_caps_new_simple ("audio/x-eac3", NULL);
1028         g_free (desc);
1029         break;
1030       }
1031       desc =
1032           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1033           DESC_DVB_TELETEXT);
1034       if (desc) {
1035         GST_LOG ("teletext");
1036         template = gst_static_pad_template_get (&private_template);
1037         name = g_strdup_printf ("private_%04x", bstream->pid);
1038         caps = gst_caps_new_simple ("private/teletext", NULL);
1039         g_free (desc);
1040         break;
1041       }
1042       desc =
1043           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1044           DESC_DVB_SUBTITLING);
1045       if (desc) {
1046         GST_LOG ("subtitling");
1047         template = gst_static_pad_template_get (&private_template);
1048         name = g_strdup_printf ("private_%04x", bstream->pid);
1049         caps = gst_caps_new_simple ("subpicture/x-dvb", NULL);
1050         g_free (desc);
1051         break;
1052       }
1053       /* hack for itv hd (sid 10510, video pid 3401 */
1054       if (program->program_number == 10510 && bstream->pid == 3401) {
1055         template = gst_static_pad_template_get (&video_template);
1056         name = g_strdup_printf ("video_%04x", bstream->pid);
1057         caps = gst_caps_new_simple ("video/x-h264",
1058             "stream-format", G_TYPE_STRING, "byte-stream",
1059             "alignment", G_TYPE_STRING, "nal", NULL);
1060       }
1061       break;
1062     case ST_HDV_AUX_V:
1063       /* We don't expose those streams since they're only helper streams */
1064       /* template = gst_static_pad_template_get (&private_template); */
1065       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
1066       /* caps = gst_caps_new_simple ("hdv/aux-v", NULL); */
1067       break;
1068     case ST_HDV_AUX_A:
1069       /* We don't expose those streams since they're only helper streams */
1070       /* template = gst_static_pad_template_get (&private_template); */
1071       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
1072       /* caps = gst_caps_new_simple ("hdv/aux-a", NULL); */
1073       break;
1074     case ST_PRIVATE_SECTIONS:
1075     case ST_MHEG:
1076     case ST_DSMCC:
1077     case ST_DSMCC_A:
1078     case ST_DSMCC_B:
1079     case ST_DSMCC_C:
1080     case ST_DSMCC_D:
1081       MPEGTS_BIT_UNSET (base->is_pes, bstream->pid);
1082       break;
1083     case ST_AUDIO_AAC:         /* ADTS */
1084       template = gst_static_pad_template_get (&audio_template);
1085       name = g_strdup_printf ("audio_%04x", bstream->pid);
1086       caps = gst_caps_new_simple ("audio/mpeg",
1087           "mpegversion", G_TYPE_INT, 4,
1088           "stream-format", G_TYPE_STRING, "adts", NULL);
1089       break;
1090     case ST_VIDEO_MPEG4:
1091       template = gst_static_pad_template_get (&video_template);
1092       name = g_strdup_printf ("video_%04x", bstream->pid);
1093       caps = gst_caps_new_simple ("video/mpeg",
1094           "mpegversion", G_TYPE_INT, 4,
1095           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
1096       break;
1097     case ST_VIDEO_H264:
1098       template = gst_static_pad_template_get (&video_template);
1099       name = g_strdup_printf ("video_%04x", bstream->pid);
1100       caps = gst_caps_new_simple ("video/x-h264",
1101           "stream-format", G_TYPE_STRING, "byte-stream",
1102           "alignment", G_TYPE_STRING, "nal", NULL);
1103       break;
1104     case ST_VIDEO_DIRAC:
1105       desc =
1106           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1107           DESC_REGISTRATION);
1108       if (desc) {
1109         if (DESC_LENGTH (desc) >= 4) {
1110           if (DESC_REGISTRATION_format_identifier (desc) == 0x64726163) {
1111             GST_LOG ("dirac");
1112             /* dirac in hex */
1113             template = gst_static_pad_template_get (&video_template);
1114             name = g_strdup_printf ("video_%04x", bstream->pid);
1115             caps = gst_caps_new_simple ("video/x-dirac", NULL);
1116           }
1117         }
1118         g_free (desc);
1119       }
1120       break;
1121     case ST_PRIVATE_EA:        /* Try to detect a VC1 stream */
1122     {
1123       desc =
1124           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1125           DESC_REGISTRATION);
1126       if (desc) {
1127         if (DESC_LENGTH (desc) >= 4) {
1128           if (DESC_REGISTRATION_format_identifier (desc) == DRF_ID_VC1) {
1129             GST_WARNING ("0xea private stream type found but no descriptor "
1130                 "for VC1. Assuming plain VC1.");
1131             template = gst_static_pad_template_get (&video_template);
1132             name = g_strdup_printf ("video_%04x", bstream->pid);
1133             caps = gst_caps_new_simple ("video/x-wmv",
1134                 "wmvversion", G_TYPE_INT, 3,
1135                 "format", GST_TYPE_FOURCC, GST_MAKE_FOURCC ('W', 'V', 'C', '1'),
1136                 NULL);
1137           }
1138         }
1139         g_free (desc);
1140       }
1141       break;
1142     }
1143     case ST_BD_AUDIO_AC3:
1144     {
1145       /* REGISTRATION DRF_ID_HDMV */
1146       desc = mpegts_get_descriptor_from_program (program, DESC_REGISTRATION);
1147       if (desc) {
1148         if (DESC_REGISTRATION_format_identifier (desc) == DRF_ID_HDMV) {
1149           template = gst_static_pad_template_get (&audio_template);
1150           name = g_strdup_printf ("audio_%04x", bstream->pid);
1151           caps = gst_caps_new_simple ("audio/x-eac3", NULL);
1152         }
1153         g_free (desc);
1154       }
1155       if (template)
1156         break;
1157
1158       /* DVB_ENHANCED_AC3 */
1159       desc =
1160           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1161           DESC_DVB_ENHANCED_AC3);
1162       if (desc) {
1163         template = gst_static_pad_template_get (&audio_template);
1164         name = g_strdup_printf ("audio_%04x", bstream->pid);
1165         caps = gst_caps_new_simple ("audio/x-eac3", NULL);
1166         g_free (desc);
1167         break;
1168       }
1169
1170       /* DVB_AC3 */
1171       desc =
1172           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
1173           DESC_DVB_AC3);
1174       if (!desc)
1175         GST_WARNING ("AC3 stream type found but no corresponding "
1176             "descriptor to differentiate between AC3 and EAC3. "
1177             "Assuming plain AC3.");
1178       else
1179         g_free (desc);
1180       template = gst_static_pad_template_get (&audio_template);
1181       name = g_strdup_printf ("audio_%04x", bstream->pid);
1182       caps = gst_caps_new_simple ("audio/x-ac3", NULL);
1183       break;
1184     }
1185     case ST_BD_AUDIO_EAC3:
1186       template = gst_static_pad_template_get (&audio_template);
1187       name = g_strdup_printf ("audio_%04x", bstream->pid);
1188       caps = gst_caps_new_simple ("audio/x-eac3", NULL);
1189       break;
1190     case ST_PS_AUDIO_DTS:
1191       template = gst_static_pad_template_get (&audio_template);
1192       name = g_strdup_printf ("audio_%04x", bstream->pid);
1193       caps = gst_caps_new_simple ("audio/x-dts", NULL);
1194       break;
1195     case ST_PS_AUDIO_LPCM:
1196       template = gst_static_pad_template_get (&audio_template);
1197       name = g_strdup_printf ("audio_%04x", bstream->pid);
1198       caps = gst_caps_new_simple ("audio/x-lpcm", NULL);
1199       break;
1200     case ST_BD_AUDIO_LPCM:
1201       template = gst_static_pad_template_get (&audio_template);
1202       name = g_strdup_printf ("audio_%04x", bstream->pid);
1203       caps = gst_caps_new_simple ("audio/x-private-ts-lpcm", NULL);
1204       break;
1205     case ST_PS_DVD_SUBPICTURE:
1206       template = gst_static_pad_template_get (&subpicture_template);
1207       name = g_strdup_printf ("subpicture_%04x", bstream->pid);
1208       caps = gst_caps_new_simple ("video/x-dvd-subpicture", NULL);
1209       break;
1210     case ST_BD_PGS_SUBPICTURE:
1211       template = gst_static_pad_template_get (&subpicture_template);
1212       name = g_strdup_printf ("subpicture_%04x", bstream->pid);
1213       caps = gst_caps_new_simple ("subpicture/x-pgs", NULL);
1214       break;
1215     default:
1216       GST_WARNING ("Non-media stream (stream_type:0x%x). Not creating pad",
1217           bstream->stream_type);
1218       break;
1219   }
1220   if (template && name && caps) {
1221     GST_LOG ("stream:%p creating pad with name %s and caps %s", stream, name,
1222         gst_caps_to_string (caps));
1223     pad = gst_pad_new_from_template (template, name);
1224     gst_pad_use_fixed_caps (pad);
1225     gst_pad_set_caps (pad, caps);
1226     gst_pad_set_query_type_function (pad, gst_ts_demux_srcpad_query_types);
1227     gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query);
1228     gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event);
1229     gst_caps_unref (caps);
1230   }
1231
1232   g_free (name);
1233
1234   return pad;
1235 }
1236
1237 static void
1238 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
1239     MpegTSBaseProgram * program)
1240 {
1241   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1242
1243   if (!stream->pad) {
1244     /* Create the pad */
1245     if (bstream->stream_type != 0xff)
1246       stream->pad = create_pad_for_stream (base, bstream, program);
1247
1248     stream->pts = GST_CLOCK_TIME_NONE;
1249     stream->dts = GST_CLOCK_TIME_NONE;
1250     stream->raw_pts = 0;
1251     stream->raw_dts = 0;
1252     stream->nb_pts_rollover = 0;
1253     stream->nb_dts_rollover = 0;
1254   }
1255   stream->flow_return = GST_FLOW_OK;
1256 }
1257
1258 static void
1259 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
1260 {
1261   GstTSDemux *demux = GST_TS_DEMUX (base);
1262   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1263
1264   if (stream->pad) {
1265     if (gst_pad_is_active (stream->pad)) {
1266       gboolean need_newsegment = demux->need_newsegment;
1267
1268       /* We must not send the newsegment when flushing the pending data
1269          on the removed stream. We should only push it when the newly added
1270          stream finishes parsing its PTS */
1271       demux->need_newsegment = FALSE;
1272
1273       /* Flush out all data */
1274       GST_DEBUG_OBJECT (stream->pad, "Flushing out pending data");
1275       gst_ts_demux_push_pending_data ((GstTSDemux *) base, stream);
1276
1277       demux->need_newsegment = need_newsegment;
1278
1279       GST_DEBUG_OBJECT (stream->pad, "Pushing out EOS");
1280       gst_pad_push_event (stream->pad, gst_event_new_eos ());
1281       GST_DEBUG_OBJECT (stream->pad, "Deactivating and removing pad");
1282       gst_pad_set_active (stream->pad, FALSE);
1283       gst_element_remove_pad (GST_ELEMENT_CAST (base), stream->pad);
1284     }
1285     stream->pad = NULL;
1286   }
1287   stream->flow_return = GST_FLOW_NOT_LINKED;
1288 }
1289
1290 static void
1291 activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
1292 {
1293   if (stream->pad) {
1294     GST_DEBUG_OBJECT (tsdemux, "Activating pad %s:%s for stream %p",
1295         GST_DEBUG_PAD_NAME (stream->pad), stream);
1296     gst_pad_set_active (stream->pad, TRUE);
1297     gst_element_add_pad ((GstElement *) tsdemux, stream->pad);
1298     GST_DEBUG_OBJECT (stream->pad, "done adding pad");
1299   } else
1300     GST_WARNING_OBJECT (tsdemux,
1301         "stream %p (pid 0x%04x, type:0x%03x) has no pad", stream,
1302         ((MpegTSBaseStream *) stream)->pid,
1303         ((MpegTSBaseStream *) stream)->stream_type);
1304 }
1305
1306 static void
1307 gst_ts_demux_stream_flush (TSDemuxStream * stream)
1308 {
1309   gint i;
1310
1311   stream->pts = GST_CLOCK_TIME_NONE;
1312
1313   for (i = 0; i < stream->nbpending; i++)
1314     gst_buffer_unref (stream->pendingbuffers[i]);
1315   memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
1316   stream->nbpending = 0;
1317
1318   stream->current = NULL;
1319 }
1320
1321 static void
1322 gst_ts_demux_flush_streams (GstTSDemux * demux)
1323 {
1324   g_list_foreach (demux->program->stream_list,
1325       (GFunc) gst_ts_demux_stream_flush, NULL);
1326 }
1327
1328 static void
1329 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
1330 {
1331   GstTSDemux *demux = GST_TS_DEMUX (base);
1332
1333   GST_DEBUG ("Current program %d, new program %d",
1334       demux->program_number, program->program_number);
1335
1336   if (demux->program_number == -1 ||
1337       demux->program_number == program->program_number) {
1338     GList *tmp;
1339
1340     GST_LOG ("program %d started", program->program_number);
1341     demux->program_number = program->program_number;
1342     demux->program = program;
1343
1344     /* Activate all stream pads, pads will already have been created */
1345     if (base->mode != BASE_MODE_SCANNING) {
1346       for (tmp = program->stream_list; tmp; tmp = tmp->next)
1347         activate_pad_for_stream (demux, (TSDemuxStream *) tmp->data);
1348       gst_element_no_more_pads ((GstElement *) demux);
1349     }
1350
1351     /* Inform scanner we have got our program */
1352     demux->current_program_number = program->program_number;
1353     demux->need_newsegment = TRUE;
1354   }
1355 }
1356
1357 static gboolean
1358 process_section (MpegTSBase * base)
1359 {
1360   GstTSDemux *demux = GST_TS_DEMUX (base);
1361   gboolean based;
1362   gboolean done = FALSE;
1363   MpegTSPacketizerPacket packet;
1364   MpegTSPacketizerPacketReturn pret;
1365
1366   while ((!done)
1367       && ((pret =
1368               mpegts_packetizer_next_packet (base->packetizer,
1369                   &packet)) != PACKET_NEED_MORE)) {
1370     if (G_UNLIKELY (pret == PACKET_BAD))
1371       /* bad header, skip the packet */
1372       goto next;
1373
1374     /* base PSI data */
1375     if (packet.payload != NULL && mpegts_base_is_psi (base, &packet)) {
1376       MpegTSPacketizerSection section;
1377
1378       based =
1379           mpegts_packetizer_push_section (base->packetizer, &packet, &section);
1380       if (G_UNLIKELY (!based))
1381         /* bad section data */
1382         goto next;
1383
1384       if (G_LIKELY (section.complete)) {
1385         /* section complete */
1386         GST_DEBUG ("Section Complete");
1387         based = mpegts_base_handle_psi (base, &section);
1388         gst_buffer_unref (section.buffer);
1389         if (G_UNLIKELY (!based))
1390           /* bad PSI table */
1391           goto next;
1392
1393       }
1394
1395       if (demux->program != NULL) {
1396         GST_DEBUG ("Got Program");
1397         done = TRUE;
1398       }
1399     }
1400   next:
1401     mpegts_packetizer_clear_packet (base->packetizer, &packet);
1402   }
1403   return done;
1404 }
1405
1406 static gboolean
1407 process_pes (MpegTSBase * base, TSPcrOffset * pcroffset)
1408 {
1409   gboolean based, done = FALSE;
1410   MpegTSPacketizerPacket packet;
1411   MpegTSPacketizerPacketReturn pret;
1412   GstTSDemux *demux = GST_TS_DEMUX (base);
1413   guint16 pcr_pid = 0;
1414
1415   while ((!done)
1416       && ((pret =
1417               mpegts_packetizer_next_packet (base->packetizer,
1418                   &packet)) != PACKET_NEED_MORE)) {
1419     if (G_UNLIKELY (pret == PACKET_BAD))
1420       /* bad header, skip the packet */
1421       goto next;
1422
1423     if (demux->program != NULL) {
1424       pcr_pid = demux->program->pcr_pid;
1425     }
1426
1427     /* base PSI data */
1428     if (packet.payload != NULL && mpegts_base_is_psi (base, &packet)) {
1429       MpegTSPacketizerSection section;
1430
1431       based =
1432           mpegts_packetizer_push_section (base->packetizer, &packet, &section);
1433       if (G_UNLIKELY (!based))
1434         /* bad section data */
1435         goto next;
1436
1437       if (G_LIKELY (section.complete)) {
1438         /* section complete */
1439         GST_DEBUG ("Section Complete");
1440         based = mpegts_base_handle_psi (base, &section);
1441         gst_buffer_unref (section.buffer);
1442         if (G_UNLIKELY (!based))
1443           /* bad PSI table */
1444           goto next;
1445
1446       }
1447     }
1448     if (packet.pid == pcr_pid && (packet.adaptation_field_control & 0x02)
1449         && (packet.afc_flags & MPEGTS_AFC_PCR_FLAG)) {
1450       GST_DEBUG ("PCR[0x%x]: %" G_GINT64_FORMAT, packet.pid, packet.pcr);
1451       pcroffset->pcr = packet.pcr;
1452       pcroffset->offset = packet.offset;
1453       done = TRUE;
1454     }
1455   next:
1456     mpegts_packetizer_clear_packet (base->packetizer, &packet);
1457   }
1458   return done;
1459 }
1460
1461 static GstFlowReturn
1462 find_pcr_packet (MpegTSBase * base, guint64 offset, gint64 length,
1463     TSPcrOffset * pcroffset)
1464 {
1465   GstFlowReturn ret = GST_FLOW_OK;
1466   GstTSDemux *demux = GST_TS_DEMUX (base);
1467   MpegTSBaseProgram *program;
1468   GstBuffer *buf;
1469   gboolean done = FALSE;
1470   guint64 scan_offset = 0;
1471
1472   GST_DEBUG ("Scanning for PCR between:%" G_GINT64_FORMAT
1473       " and the end:%" G_GINT64_FORMAT, offset, offset + length);
1474
1475   /* Get the program */
1476   program = demux->program;
1477   if (G_UNLIKELY (program == NULL))
1478     return GST_FLOW_ERROR;
1479
1480   mpegts_packetizer_flush (base->packetizer);
1481   if (offset >= 4 && base->packetizer->packet_size == MPEGTS_M2TS_PACKETSIZE)
1482     offset -= 4;
1483
1484   while (!done && scan_offset < length) {
1485     ret =
1486         gst_pad_pull_range (base->sinkpad, offset + scan_offset,
1487         50 * MPEGTS_MAX_PACKETSIZE, &buf);
1488     if (ret != GST_FLOW_OK)
1489       goto beach;
1490     mpegts_packetizer_push (base->packetizer, buf);
1491     done = process_pes (base, pcroffset);
1492     scan_offset += 50 * MPEGTS_MAX_PACKETSIZE;
1493   }
1494
1495   if (!done || scan_offset >= length) {
1496     GST_WARNING ("No PCR found!");
1497     ret = GST_FLOW_ERROR;
1498     goto beach;
1499   }
1500
1501 beach:
1502   mpegts_packetizer_flush (base->packetizer);
1503   return ret;
1504 }
1505
1506 static gboolean
1507 verify_timestamps (MpegTSBase * base, TSPcrOffset * first, TSPcrOffset * last)
1508 {
1509   GstTSDemux *demux = GST_TS_DEMUX (base);
1510   guint64 length = 4000 * MPEGTS_MAX_PACKETSIZE;
1511   guint64 offset = PCR_WRAP_SIZE_128KBPS;
1512
1513   demux->index =
1514       g_array_sized_new (TRUE, TRUE, sizeof (*first),
1515       2 + 1 + ((last->offset - first->offset) / PCR_WRAP_SIZE_128KBPS));
1516
1517   first->gsttime = PCRTIME_TO_GSTTIME (first->pcr);
1518   demux->index = g_array_append_val (demux->index, *first);
1519   demux->index_size++;
1520   demux->first_pcr = *first;
1521   demux->index_pcr = *first;
1522   GST_DEBUG ("first time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
1523       " offset: %" G_GINT64_FORMAT
1524       " last  pcr: %" GST_TIME_FORMAT " offset: %" G_GINT64_FORMAT,
1525       GST_TIME_ARGS (first->gsttime),
1526       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (first->pcr)), first->offset,
1527       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset);
1528
1529   while (offset + length < last->offset) {
1530     TSPcrOffset half;
1531     GstFlowReturn ret;
1532     gint tries = 0;
1533
1534   retry:
1535     ret = find_pcr_packet (base, offset, length, &half);
1536     if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1537       GST_WARNING ("no pcr found, retrying");
1538       if (tries++ < 3) {
1539         offset += length;
1540         length *= 2;
1541         goto retry;
1542       }
1543       return FALSE;
1544     }
1545
1546     half.gsttime = calculate_gsttime (first, half.pcr);
1547
1548     GST_DEBUG ("add half time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
1549         " offset: %" G_GINT64_FORMAT,
1550         GST_TIME_ARGS (half.gsttime),
1551         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (half.pcr)), half.offset);
1552     demux->index = g_array_append_val (demux->index, half);
1553     demux->index_size++;
1554
1555     length = 4000 * MPEGTS_MAX_PACKETSIZE;
1556     offset += PCR_WRAP_SIZE_128KBPS;
1557     *first = half;
1558   }
1559
1560   last->gsttime = calculate_gsttime (first, last->pcr);
1561
1562   GST_DEBUG ("add last time: %" GST_TIME_FORMAT " pcr: %" GST_TIME_FORMAT
1563       " offset: %" G_GINT64_FORMAT,
1564       GST_TIME_ARGS (last->gsttime),
1565       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (last->pcr)), last->offset);
1566
1567   demux->index = g_array_append_val (demux->index, *last);
1568   demux->index_size++;
1569
1570   demux->last_pcr = *last;
1571   return TRUE;
1572 }
1573
1574 static GstFlowReturn
1575 find_timestamps (MpegTSBase * base, guint64 initoff, guint64 * offset)
1576 {
1577
1578   GstFlowReturn ret = GST_FLOW_OK;
1579   GstBuffer *buf;
1580   gboolean done = FALSE;
1581   GstFormat format = GST_FORMAT_BYTES;
1582   gint64 total_bytes;
1583   guint64 scan_offset;
1584   guint i = 0;
1585   TSPcrOffset initial, final;
1586   GstTSDemux *demux = GST_TS_DEMUX (base);
1587
1588   GST_DEBUG ("Scanning for timestamps");
1589
1590   /* Flush what remained from before */
1591   mpegts_packetizer_clear (base->packetizer);
1592
1593   /* Start scanning from know PAT offset */
1594   while (!done) {
1595     ret =
1596         gst_pad_pull_range (base->sinkpad, i * 50 * MPEGTS_MAX_PACKETSIZE,
1597         50 * MPEGTS_MAX_PACKETSIZE, &buf);
1598     if (ret != GST_FLOW_OK)
1599       goto beach;
1600     mpegts_packetizer_push (base->packetizer, buf);
1601     done = process_section (base);
1602     i++;
1603   }
1604   mpegts_packetizer_clear (base->packetizer);
1605   done = FALSE;
1606   i = 1;
1607
1608
1609   *offset = base->seek_offset;
1610
1611   /* Search for the first PCRs */
1612   ret = process_pcr (base, base->first_pat_offset, &initial, 10, TRUE);
1613
1614   if (ret != GST_FLOW_OK && ret != GST_FLOW_UNEXPECTED) {
1615     GST_WARNING ("Problem getting initial PCRs");
1616     goto beach;
1617   }
1618
1619   mpegts_packetizer_clear (base->packetizer);
1620   /* Remove current program so we ensure looking for a PAT when scanning the 
1621    * for the final PCR */
1622   gst_structure_free (base->pat);
1623   base->pat = NULL;
1624   mpegts_base_remove_program (base, demux->current_program_number);
1625   demux->program = NULL;
1626
1627   /* Find end position */
1628   if (G_UNLIKELY (!gst_pad_query_peer_duration (base->sinkpad, &format,
1629               &total_bytes) || format != GST_FORMAT_BYTES)) {
1630     GST_WARNING_OBJECT (base, "Couldn't get upstream size in bytes");
1631     ret = GST_FLOW_ERROR;
1632     mpegts_packetizer_clear (base->packetizer);
1633     return ret;
1634   }
1635   GST_DEBUG ("Upstream is %" G_GINT64_FORMAT " bytes", total_bytes);
1636
1637
1638   /* Let's start scanning 4000 packets from the end */
1639   scan_offset = MAX (188, total_bytes - 4000 * MPEGTS_MAX_PACKETSIZE);
1640
1641   GST_DEBUG ("Scanning for last sync point between:%" G_GINT64_FORMAT
1642       " and the end:%" G_GINT64_FORMAT, scan_offset, total_bytes);
1643   while ((!done) && (scan_offset < total_bytes)) {
1644     ret =
1645         gst_pad_pull_range (base->sinkpad,
1646         scan_offset, 50 * MPEGTS_MAX_PACKETSIZE, &buf);
1647     if (ret != GST_FLOW_OK)
1648       goto beach;
1649
1650     mpegts_packetizer_push (base->packetizer, buf);
1651     done = process_section (base);
1652     scan_offset += 50 * MPEGTS_MAX_PACKETSIZE;
1653   }
1654
1655   mpegts_packetizer_clear (base->packetizer);
1656
1657   GST_DEBUG ("Searching PCR");
1658   ret =
1659       process_pcr (base, scan_offset - 50 * MPEGTS_MAX_PACKETSIZE, &final, 10,
1660       FALSE);
1661
1662   if (ret != GST_FLOW_OK) {
1663     GST_DEBUG ("Problem getting last PCRs");
1664     goto beach;
1665   }
1666
1667   verify_timestamps (base, &initial, &final);
1668
1669   gst_segment_set_duration (&demux->segment, GST_FORMAT_TIME,
1670       demux->last_pcr.gsttime - demux->first_pcr.gsttime);
1671   demux->duration = demux->last_pcr.gsttime - demux->first_pcr.gsttime;
1672   GST_DEBUG ("Done, duration:%" GST_TIME_FORMAT,
1673       GST_TIME_ARGS (demux->duration));
1674
1675 beach:
1676
1677   mpegts_packetizer_clear (base->packetizer);
1678   /* Remove current program */
1679   if (base->pat) {
1680     gst_structure_free (base->pat);
1681     base->pat = NULL;
1682   }
1683   mpegts_base_remove_program (base, demux->current_program_number);
1684   demux->program = NULL;
1685
1686   return ret;
1687 }
1688
1689 static GstFlowReturn
1690 process_pcr (MpegTSBase * base, guint64 initoff, TSPcrOffset * pcroffset,
1691     guint numpcr, gboolean isinitial)
1692 {
1693   GstTSDemux *demux = GST_TS_DEMUX (base);
1694   GstFlowReturn ret = GST_FLOW_OK;
1695   MpegTSBaseProgram *program;
1696   GstBuffer *buf;
1697   guint nbpcr, i = 0;
1698   guint32 pcrmask, pcrpattern;
1699   guint64 pcrs[50];
1700   guint64 pcroffs[50];
1701   GstByteReader br;
1702
1703   GST_DEBUG ("initoff:%" G_GUINT64_FORMAT ", numpcr:%d, isinitial:%d",
1704       initoff, numpcr, isinitial);
1705
1706   /* Get the program */
1707   program = demux->program;
1708   if (G_UNLIKELY (program == NULL))
1709     return GST_FLOW_ERROR;
1710
1711   /* First find the first X PCR */
1712   nbpcr = 0;
1713   /* Mask/pattern is PID:PCR_PID, AFC&0x02 */
1714   /* sync_byte (0x47)                   : 8bits => 0xff
1715    * transport_error_indicator          : 1bit  ACTIVATE
1716    * payload_unit_start_indicator       : 1bit  IGNORE
1717    * transport_priority                 : 1bit  IGNORE
1718    * PID                                : 13bit => 0x9f 0xff
1719    * transport_scrambling_control       : 2bit
1720    * adaptation_field_control           : 2bit
1721    * continuity_counter                 : 4bit  => 0x30
1722    */
1723   pcrmask = 0xff9fff20;
1724   pcrpattern = 0x47000020 | ((program->pcr_pid & 0x1fff) << 8);
1725
1726   for (i = 0; (i < 20) && (nbpcr < numpcr); i++) {
1727     guint offset, size;
1728
1729     ret =
1730         gst_pad_pull_range (base->sinkpad,
1731         initoff + i * 500 * base->packetsize, 500 * base->packetsize, &buf);
1732
1733     if (G_UNLIKELY (ret != GST_FLOW_OK))
1734       goto beach;
1735
1736     gst_byte_reader_init_from_buffer (&br, buf);
1737
1738     offset = 0;
1739     size = GST_BUFFER_SIZE (buf);
1740
1741   resync:
1742     offset = gst_byte_reader_masked_scan_uint32 (&br, 0xff000000, 0x47000000,
1743         0, base->packetsize);
1744
1745     if (offset == -1)
1746       continue;
1747
1748     while ((nbpcr < numpcr) && (size >= base->packetsize)) {
1749
1750       guint32 header = GST_READ_UINT32_BE (br.data + offset);
1751
1752       if ((header >> 24) != 0x47)
1753         goto resync;
1754
1755       if ((header & pcrmask) != pcrpattern) {
1756         /* Move offset forward by 1 packet */
1757         size -= base->packetsize;
1758         offset += base->packetsize;
1759         continue;
1760       }
1761
1762       /* Potential PCR */
1763 /*      GST_DEBUG ("offset %" G_GUINT64_FORMAT, GST_BUFFER_OFFSET (buf) + offset);
1764       GST_MEMDUMP ("something", GST_BUFFER_DATA (buf) + offset, 16);*/
1765       if ((*(br.data + offset + 5)) & MPEGTS_AFC_PCR_FLAG) {
1766         guint64 lpcr = mpegts_packetizer_compute_pcr (br.data + offset + 6);
1767
1768         GST_INFO ("Found PCR %" G_GUINT64_FORMAT " %" GST_TIME_FORMAT
1769             " at offset %" G_GUINT64_FORMAT, lpcr,
1770             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (lpcr)),
1771             GST_BUFFER_OFFSET (buf) + offset);
1772         pcrs[nbpcr] = lpcr;
1773         pcroffs[nbpcr] = GST_BUFFER_OFFSET (buf) + offset;
1774         /* Safeguard against bogus PCR (by detecting if it's the same as the
1775          * previous one or wheter the difference with the previous one is
1776          * greater than 10mins */
1777         if (nbpcr > 1) {
1778           if (pcrs[nbpcr] == pcrs[nbpcr - 1]) {
1779             GST_WARNING ("Found same PCR at different offset");
1780           } else if (pcrs[nbpcr] < pcrs[nbpcr - 1]) {
1781             GST_WARNING ("Found PCR wraparound");
1782             nbpcr += 1;
1783           } else if ((pcrs[nbpcr] - pcrs[nbpcr - 1]) >
1784               (guint64) 10 * 60 * 27000000) {
1785             GST_WARNING ("PCR differs with previous PCR by more than 10 mins");
1786           } else
1787             nbpcr += 1;
1788         } else
1789           nbpcr += 1;
1790       }
1791       /* Move offset forward by 1 packet */
1792       size -= base->packetsize;
1793       offset += base->packetsize;
1794     }
1795   }
1796
1797 beach:
1798   GST_DEBUG ("Found %d PCR", nbpcr);
1799   if (nbpcr) {
1800     if (isinitial) {
1801       pcroffset->pcr = pcrs[0];
1802       pcroffset->offset = pcroffs[0];
1803     } else {
1804       pcroffset->pcr = pcrs[nbpcr - 1];
1805       pcroffset->offset = pcroffs[nbpcr - 1];
1806     }
1807     if (nbpcr > 1) {
1808       GST_DEBUG ("pcrdiff:%" GST_TIME_FORMAT " offsetdiff %" G_GUINT64_FORMAT,
1809           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1] - pcrs[0])),
1810           pcroffs[nbpcr - 1] - pcroffs[0]);
1811       GST_DEBUG ("Estimated bitrate %" G_GUINT64_FORMAT,
1812           gst_util_uint64_scale (GST_SECOND, pcroffs[nbpcr - 1] - pcroffs[0],
1813               PCRTIME_TO_GSTTIME (pcrs[nbpcr - 1] - pcrs[0])));
1814       GST_DEBUG ("Average PCR interval %" G_GUINT64_FORMAT,
1815           (pcroffs[nbpcr - 1] - pcroffs[0]) / nbpcr);
1816     }
1817   }
1818   /* Swallow any errors if it happened during the end scanning */
1819   if (!isinitial)
1820     ret = GST_FLOW_OK;
1821   return ret;
1822 }
1823
1824
1825
1826
1827 static inline void
1828 gst_ts_demux_record_pcr (GstTSDemux * demux, TSDemuxStream * stream,
1829     guint64 pcr, guint64 offset)
1830 {
1831   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1832
1833   GST_LOG ("pid 0x%04x pcr:%" GST_TIME_FORMAT " at offset %"
1834       G_GUINT64_FORMAT, bs->pid,
1835       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr)), offset);
1836
1837   if (G_LIKELY (bs->pid == demux->program->pcr_pid)) {
1838     demux->cur_pcr.gsttime = GST_CLOCK_TIME_NONE;
1839     demux->cur_pcr.offset = offset;
1840     demux->cur_pcr.pcr = pcr;
1841     /* set first_pcr in push mode */
1842     if (G_UNLIKELY (!demux->first_pcr.gsttime == GST_CLOCK_TIME_NONE)) {
1843       demux->first_pcr.gsttime = PCRTIME_TO_GSTTIME (pcr);
1844       demux->first_pcr.offset = offset;
1845       demux->first_pcr.pcr = pcr;
1846     }
1847   }
1848
1849   if (G_UNLIKELY (demux->emit_statistics)) {
1850     GstStructure *st;
1851     st = gst_structure_id_empty_new (QUARK_TSDEMUX);
1852     gst_structure_id_set (st,
1853         QUARK_PID, G_TYPE_UINT, bs->pid,
1854         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_PCR, G_TYPE_UINT64, pcr,
1855         NULL);
1856     gst_element_post_message (GST_ELEMENT_CAST (demux),
1857         gst_message_new_element (GST_OBJECT (demux), st));
1858   }
1859 }
1860
1861 static inline void
1862 gst_ts_demux_record_opcr (GstTSDemux * demux, TSDemuxStream * stream,
1863     guint64 opcr, guint64 offset)
1864 {
1865   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1866
1867   GST_LOG ("pid 0x%04x opcr:%" GST_TIME_FORMAT " at offset %"
1868       G_GUINT64_FORMAT, bs->pid,
1869       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (opcr)), offset);
1870
1871   if (G_UNLIKELY (demux->emit_statistics)) {
1872     GstStructure *st;
1873     st = gst_structure_id_empty_new (QUARK_TSDEMUX);
1874     gst_structure_id_set (st,
1875         QUARK_PID, G_TYPE_UINT, bs->pid,
1876         QUARK_OFFSET, G_TYPE_UINT64, offset,
1877         QUARK_OPCR, G_TYPE_UINT64, opcr, NULL);
1878     gst_element_post_message (GST_ELEMENT_CAST (demux),
1879         gst_message_new_element (GST_OBJECT (demux), st));
1880   }
1881 }
1882
1883 static inline void
1884 gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
1885     guint64 pts, guint64 offset)
1886 {
1887   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1888
1889   GST_LOG ("pid 0x%04x pts:%" G_GUINT64_FORMAT " at offset %"
1890       G_GUINT64_FORMAT, bs->pid, pts, offset);
1891
1892   if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (stream->pts) &&
1893           ABSDIFF (stream->raw_pts, pts) > 900000)) {
1894     /* Detect rollover if diff > 10s */
1895     GST_LOG ("Detected rollover (previous:%" G_GUINT64_FORMAT " new:%"
1896         G_GUINT64_FORMAT ")", stream->raw_pts, pts);
1897     if (pts < stream->raw_pts) {
1898       /* Forward rollover */
1899       GST_LOG ("Forward rollover, incrementing nb_pts_rollover");
1900       stream->nb_pts_rollover++;
1901     } else {
1902       /* Reverse rollover */
1903       GST_LOG ("Reverse rollover, decrementing nb_pts_rollover");
1904       stream->nb_pts_rollover--;
1905     }
1906   }
1907
1908   /* Compute PTS in GstClockTime */
1909   stream->raw_pts = pts;
1910   stream->pts =
1911       MPEGTIME_TO_GSTTIME (pts + stream->nb_pts_rollover * PTS_DTS_MAX_VALUE);
1912
1913   GST_LOG ("pid 0x%04x Stored PTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
1914       bs->pid, stream->raw_pts, GST_TIME_ARGS (stream->pts));
1915
1916
1917   if (G_UNLIKELY (demux->emit_statistics)) {
1918     GstStructure *st;
1919     st = gst_structure_id_empty_new (QUARK_TSDEMUX);
1920     gst_structure_id_set (st,
1921         QUARK_PID, G_TYPE_UINT, bs->pid,
1922         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_PTS, G_TYPE_UINT64, pts,
1923         NULL);
1924     gst_element_post_message (GST_ELEMENT_CAST (demux),
1925         gst_message_new_element (GST_OBJECT (demux), st));
1926   }
1927 }
1928
1929 static inline void
1930 gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
1931     guint64 dts, guint64 offset)
1932 {
1933   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1934
1935   GST_LOG ("pid 0x%04x dts:%" G_GUINT64_FORMAT " at offset %"
1936       G_GUINT64_FORMAT, bs->pid, dts, offset);
1937
1938   if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (stream->dts) &&
1939           ABSDIFF (stream->raw_dts, dts) > 900000)) {
1940     /* Detect rollover if diff > 10s */
1941     GST_LOG ("Detected rollover (previous:%" G_GUINT64_FORMAT " new:%"
1942         G_GUINT64_FORMAT ")", stream->raw_dts, dts);
1943     if (dts < stream->raw_dts) {
1944       /* Forward rollover */
1945       GST_LOG ("Forward rollover, incrementing nb_dts_rollover");
1946       stream->nb_dts_rollover++;
1947     } else {
1948       /* Reverse rollover */
1949       GST_LOG ("Reverse rollover, decrementing nb_dts_rollover");
1950       stream->nb_dts_rollover--;
1951     }
1952   }
1953
1954   /* Compute DTS in GstClockTime */
1955   stream->raw_dts = dts;
1956   stream->dts =
1957       MPEGTIME_TO_GSTTIME (dts + stream->nb_dts_rollover * PTS_DTS_MAX_VALUE);
1958
1959   GST_LOG ("pid 0x%04x Stored DTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
1960       bs->pid, stream->raw_dts, GST_TIME_ARGS (stream->dts));
1961
1962   if (G_UNLIKELY (demux->emit_statistics)) {
1963     GstStructure *st;
1964     st = gst_structure_id_empty_new (QUARK_TSDEMUX);
1965     gst_structure_id_set (st,
1966         QUARK_PID, G_TYPE_UINT, bs->pid,
1967         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_DTS, G_TYPE_UINT64, dts,
1968         NULL);
1969     gst_element_post_message (GST_ELEMENT_CAST (demux),
1970         gst_message_new_element (GST_OBJECT (demux), st));
1971   }
1972 }
1973
1974 static inline GstClockTime
1975 calc_gsttime_from_pts (TSPcrOffset * start, guint64 pts)
1976 {
1977   GstClockTime time = start->gsttime - PCRTIME_TO_GSTTIME (start->pcr);
1978
1979   if (start->pcr > pts * 300)
1980     time += PCRTIME_TO_GSTTIME (PCR_MAX_VALUE) + MPEGTIME_TO_GSTTIME (pts);
1981   else
1982     time += MPEGTIME_TO_GSTTIME (pts);
1983
1984   return time;
1985 }
1986
1987 #if 0
1988 static gint
1989 TSPcrOffset_find_offset (gconstpointer a, gconstpointer b, gpointer user_data)
1990 {
1991   if (((TSPcrOffset *) a)->offset < ((TSPcrOffset *) b)->offset)
1992     return -1;
1993   else if (((TSPcrOffset *) a)->offset > ((TSPcrOffset *) b)->offset)
1994     return 1;
1995   else
1996     return 0;
1997 }
1998 #endif
1999
2000 static GstFlowReturn
2001 gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream)
2002 {
2003   MpegTSBase *base = (MpegTSBase *) demux;
2004   PESHeader header;
2005   GstFlowReturn res = GST_FLOW_OK;
2006   gint offset = 0;
2007   guint8 *data;
2008   guint32 length;
2009   guint64 bufferoffset;
2010   PESParsingResult parseres;
2011
2012   data = GST_BUFFER_DATA (stream->pendingbuffers[0]);
2013   length = GST_BUFFER_SIZE (stream->pendingbuffers[0]);
2014   bufferoffset = GST_BUFFER_OFFSET (stream->pendingbuffers[0]);
2015
2016   GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
2017
2018   parseres = mpegts_parse_pes_header (data, length, &header, &offset);
2019   if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE))
2020     goto discont;
2021   if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
2022     GST_WARNING ("Error parsing PES header. pid: 0x%x stream_type: 0x%x",
2023         stream->stream.pid, stream->stream.stream_type);
2024     goto discont;
2025   }
2026
2027   if (header.DTS != -1)
2028     gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset);
2029
2030   if (header.PTS != -1) {
2031     gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset);
2032
2033 #if 0
2034     /* WTH IS THIS ??? */
2035     if (demux->index_pcr.offset + PCR_WRAP_SIZE_128KBPS + 1000 * 128 < offset
2036         || (demux->index_pcr.offset > offset)) {
2037       /* find next entry */
2038       TSPcrOffset *next;
2039       demux->index_pcr.offset = offset;
2040       next = gst_util_array_binary_search (demux->index->data,
2041           demux->index_size, sizeof (*next), TSPcrOffset_find_offset,
2042           GST_SEARCH_MODE_BEFORE, &demux->index_pcr, NULL);
2043       if (next) {
2044         GST_INFO ("new index_pcr %" GST_TIME_FORMAT " offset: %"
2045             G_GINT64_FORMAT, GST_TIME_ARGS (next->gsttime), next->offset);
2046
2047         demux->index_pcr = *next;
2048       }
2049     }
2050     time = calc_gsttime_from_pts (&demux->index_pcr, pts);
2051 #endif
2052
2053     GST_DEBUG_OBJECT (base, "stream PTS %" GST_TIME_FORMAT,
2054         GST_TIME_ARGS (stream->pts));
2055
2056     /* safe default if insufficient upstream info */
2057     if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (base->in_gap) &&
2058             GST_CLOCK_TIME_IS_VALID (base->first_buf_ts) &&
2059             base->mode == BASE_MODE_PUSHING &&
2060             base->segment.format == GST_FORMAT_TIME)) {
2061       /* Find the earliest current PTS we're going to push */
2062       GstClockTime firstpts = GST_CLOCK_TIME_NONE;
2063       GList *tmp;
2064
2065       for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
2066         TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
2067         if (!GST_CLOCK_TIME_IS_VALID (firstpts) || pstream->pts < firstpts)
2068           firstpts = pstream->pts;
2069       }
2070
2071       base->in_gap = base->first_buf_ts - firstpts;
2072       GST_DEBUG_OBJECT (base, "upstream segment start %" GST_TIME_FORMAT
2073           ", first buffer timestamp: %" GST_TIME_FORMAT
2074           ", first PTS: %" GST_TIME_FORMAT
2075           ", interpolation gap: %" GST_TIME_FORMAT,
2076           GST_TIME_ARGS (base->segment.start),
2077           GST_TIME_ARGS (base->first_buf_ts), GST_TIME_ARGS (firstpts),
2078           GST_TIME_ARGS (base->in_gap));
2079     }
2080
2081     if (!GST_CLOCK_TIME_IS_VALID (base->in_gap))
2082       base->in_gap = 0;
2083
2084     GST_BUFFER_TIMESTAMP (stream->pendingbuffers[0]) =
2085         stream->pts + base->in_gap;
2086   }
2087
2088   /* Remove PES headers */
2089   GST_DEBUG ("Moving data forward  by %d bytes", header.header_size);
2090   GST_BUFFER_DATA (stream->pendingbuffers[0]) += header.header_size;
2091   GST_BUFFER_SIZE (stream->pendingbuffers[0]) -= header.header_size;
2092
2093   /* FIXME : responsible for switching to PENDING_PACKET_BUFFER and
2094    * creating the bufferlist */
2095   if (1) {
2096     /* Append to the buffer list */
2097     if (G_UNLIKELY (stream->current == NULL)) {
2098       guint8 i;
2099
2100       /* Create a new bufferlist */
2101       stream->current = gst_buffer_list_new ();
2102       stream->currentit = gst_buffer_list_iterate (stream->current);
2103       stream->currentlist = NULL;
2104       gst_buffer_list_iterator_add_group (stream->currentit);
2105
2106       /* Push pending buffers into the list */
2107       for (i = stream->nbpending; i; i--)
2108         stream->currentlist =
2109             g_list_prepend (stream->currentlist, stream->pendingbuffers[i - 1]);
2110       memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
2111       stream->nbpending = 0;
2112     }
2113     stream->state = PENDING_PACKET_BUFFER;
2114   }
2115
2116   return res;
2117
2118 discont:
2119   stream->state = PENDING_PACKET_DISCONT;
2120   return res;
2121 }
2122
2123  /* ONLY CALL THIS:
2124   * * WITH packet->payload != NULL
2125   * * WITH pending/current flushed out if beginning of new PES packet
2126   */
2127 static inline void
2128 gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
2129     MpegTSPacketizerPacket * packet)
2130 {
2131   GstBuffer *buf;
2132
2133   GST_DEBUG ("state:%d", stream->state);
2134
2135   buf = packet->buffer;
2136   /* HACK : Instead of creating a new buffer, we just modify the data/size
2137    * of the buffer to point to the payload */
2138   GST_BUFFER_DATA (buf) = packet->payload;
2139   GST_BUFFER_SIZE (buf) = packet->data_end - packet->payload;
2140
2141   if (stream->state == PENDING_PACKET_EMPTY) {
2142     if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
2143       stream->state = PENDING_PACKET_DISCONT;
2144       GST_WARNING ("Didn't get the first packet of this PES");
2145     } else {
2146       GST_LOG ("EMPTY=>HEADER");
2147       stream->state = PENDING_PACKET_HEADER;
2148       if (stream->pad) {
2149         GST_DEBUG ("Setting pad caps on buffer %p", buf);
2150         gst_buffer_set_caps (buf, GST_PAD_CAPS (stream->pad));
2151       }
2152     }
2153   }
2154
2155   if (stream->state == PENDING_PACKET_HEADER) {
2156     GST_LOG ("HEADER: appending data to array");
2157     /* Append to the array */
2158     stream->pendingbuffers[stream->nbpending++] = buf;
2159
2160     /* parse the header */
2161     gst_ts_demux_parse_pes_header (demux, stream);
2162   } else if (stream->state == PENDING_PACKET_BUFFER) {
2163     GST_LOG ("BUFFER: appending data to bufferlist");
2164     stream->currentlist = g_list_prepend (stream->currentlist, buf);
2165   }
2166
2167
2168   return;
2169 }
2170
2171 static void
2172 calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
2173 {
2174   MpegTSBase *base = (MpegTSBase *) demux;
2175   GstEvent *newsegmentevent;
2176   gint64 start = 0, stop = GST_CLOCK_TIME_NONE, position = 0;
2177   GstClockTime firstpts = GST_CLOCK_TIME_NONE;
2178   GList *tmp;
2179
2180   GST_DEBUG ("Creating new newsegment for stream %p", stream);
2181
2182   /* Outgoing newsegment values
2183    * start    : The first/start PTS
2184    * stop     : The last PTS (or -1)
2185    * position : The stream time corresponding to start
2186    *
2187    * Except for live mode with incoming GST_TIME_FORMAT newsegment where
2188    * it is the same values as that incoming newsegment (and we convert the
2189    * PTS to that remote clock).
2190    */
2191
2192   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
2193     TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
2194
2195     if (!GST_CLOCK_TIME_IS_VALID (firstpts) || pstream->pts < firstpts)
2196       firstpts = pstream->pts;
2197   }
2198
2199   if (base->mode == BASE_MODE_PUSHING) {
2200     /* FIXME : We're just ignore the upstream format for the time being */
2201     /* FIXME : We should use base->segment.format and a upstream latency query
2202      * to decide if we need to use live values or not */
2203     GST_DEBUG ("push-based. base Segment start:%" GST_TIME_FORMAT " duration:%"
2204         GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
2205         GST_TIME_ARGS (base->segment.start),
2206         GST_TIME_ARGS (base->segment.duration),
2207         GST_TIME_ARGS (base->segment.stop), GST_TIME_ARGS (base->segment.time));
2208     GST_DEBUG ("push-based. demux Segment start:%" GST_TIME_FORMAT " duration:%"
2209         GST_TIME_FORMAT ", stop:%" GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
2210         GST_TIME_ARGS (demux->segment.start),
2211         GST_TIME_ARGS (demux->segment.duration),
2212         GST_TIME_ARGS (demux->segment.stop),
2213         GST_TIME_ARGS (demux->segment.time));
2214
2215     GST_DEBUG ("stream pts: %" GST_TIME_FORMAT " first pts: %" GST_TIME_FORMAT,
2216         GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (firstpts));
2217
2218     if (base->segment.format == GST_FORMAT_TIME) {
2219       start = base->segment.start;
2220       stop = base->segment.stop;
2221     }
2222     /* Shift the start depending on our position in the stream */
2223     start += firstpts + base->in_gap - base->first_buf_ts;
2224     position = start;
2225   } else {
2226     /* pull mode */
2227     GST_DEBUG ("pull-based. Segment start:%" GST_TIME_FORMAT " duration:%"
2228         GST_TIME_FORMAT ", time:%" GST_TIME_FORMAT,
2229         GST_TIME_ARGS (demux->segment.start),
2230         GST_TIME_ARGS (demux->segment.duration),
2231         GST_TIME_ARGS (demux->segment.time));
2232
2233     GST_DEBUG ("firstpcr gsttime : %" GST_TIME_FORMAT,
2234         GST_TIME_ARGS (demux->first_pcr.gsttime));
2235
2236     /* FIXME : This is not entirely correct. We should be using the PTS time
2237      * realm and not the PCR one. Doesn't matter *too* much if PTS/PCR values
2238      * aren't too far apart, but still.  */
2239     start = demux->first_pcr.gsttime + demux->segment.start;
2240     stop = demux->first_pcr.gsttime + demux->segment.duration;
2241     position = demux->segment.time;
2242   }
2243
2244   GST_DEBUG ("new segment:   start: %" GST_TIME_FORMAT " stop: %"
2245       GST_TIME_FORMAT " time: %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2246       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
2247   newsegmentevent =
2248       gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME, start, stop,
2249       position);
2250
2251   push_event ((MpegTSBase *) demux, newsegmentevent);
2252
2253   demux->need_newsegment = FALSE;
2254 }
2255
2256 static GstFlowReturn
2257 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
2258 {
2259   GstFlowReturn res = GST_FLOW_OK;
2260   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
2261
2262   GST_DEBUG ("stream:%p, pid:0x%04x stream_type:%d state:%d pad:%s:%s",
2263       stream, bs->pid, bs->stream_type, stream->state,
2264       GST_DEBUG_PAD_NAME (stream->pad));
2265
2266   if (G_UNLIKELY (stream->current == NULL)) {
2267     GST_LOG ("stream->current == NULL");
2268     goto beach;
2269   }
2270
2271   if (G_UNLIKELY (stream->state == PENDING_PACKET_EMPTY)) {
2272     GST_LOG ("EMPTY: returning");
2273     goto beach;
2274   }
2275
2276   if (G_UNLIKELY (stream->state != PENDING_PACKET_BUFFER))
2277     goto beach;
2278
2279   if (G_UNLIKELY (stream->pad == NULL)) {
2280     g_list_foreach (stream->currentlist, (GFunc) gst_buffer_unref, NULL);
2281     g_list_free (stream->currentlist);
2282     gst_buffer_list_iterator_free (stream->currentit);
2283     gst_buffer_list_unref (stream->current);
2284     goto beach;
2285   }
2286
2287   if (G_UNLIKELY (demux->need_newsegment))
2288     calculate_and_push_newsegment (demux, stream);
2289
2290   /* We have a confirmed buffer, let's push it out */
2291   GST_LOG ("Putting pending data into GstBufferList");
2292   stream->currentlist = g_list_reverse (stream->currentlist);
2293   gst_buffer_list_iterator_add_list (stream->currentit, stream->currentlist);
2294   gst_buffer_list_iterator_free (stream->currentit);
2295
2296   GST_DEBUG_OBJECT (stream->pad,
2297       "Pushing buffer list with timestamp: %" GST_TIME_FORMAT,
2298       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (gst_buffer_list_get
2299               (stream->current, 0, 0))));
2300
2301   res = gst_pad_push_list (stream->pad, stream->current);
2302   GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
2303   res = tsdemux_combine_flows (demux, stream, res);
2304   GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
2305
2306 beach:
2307   /* Reset everything */
2308   GST_LOG ("Resetting to EMPTY");
2309   stream->state = PENDING_PACKET_EMPTY;
2310   memset (stream->pendingbuffers, 0, TS_MAX_PENDING_BUFFERS);
2311   stream->nbpending = 0;
2312   stream->current = NULL;
2313
2314   return res;
2315 }
2316
2317 static GstFlowReturn
2318 gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
2319     MpegTSPacketizerPacket * packet, MpegTSPacketizerSection * section)
2320 {
2321   GstFlowReturn res = GST_FLOW_OK;
2322
2323   GST_DEBUG ("buffer:%p, data:%p", GST_BUFFER_DATA (packet->buffer),
2324       packet->data);
2325   GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p",
2326       packet->pid,
2327       packet->payload_unit_start_indicator,
2328       packet->adaptation_field_control,
2329       packet->continuity_counter, packet->payload);
2330
2331   if (section) {
2332     GST_DEBUG ("section complete:%d, buffer size %d",
2333         section->complete, GST_BUFFER_SIZE (section->buffer));
2334     gst_buffer_unref (packet->buffer);
2335     return res;
2336   }
2337
2338   if (G_UNLIKELY (packet->payload_unit_start_indicator))
2339     /* Flush previous data */
2340     res = gst_ts_demux_push_pending_data (demux, stream);
2341
2342   if (packet->adaptation_field_control & 0x2) {
2343     if (packet->afc_flags & MPEGTS_AFC_PCR_FLAG)
2344       gst_ts_demux_record_pcr (demux, stream, packet->pcr,
2345           GST_BUFFER_OFFSET (packet->buffer));
2346     if (packet->afc_flags & MPEGTS_AFC_OPCR_FLAG)
2347       gst_ts_demux_record_opcr (demux, stream, packet->opcr,
2348           GST_BUFFER_OFFSET (packet->buffer));
2349   }
2350
2351   if (packet->payload)
2352     gst_ts_demux_queue_data (demux, stream, packet);
2353   else
2354     gst_buffer_unref (packet->buffer);
2355
2356   return res;
2357 }
2358
2359 static void
2360 gst_ts_demux_flush (MpegTSBase * base)
2361 {
2362   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
2363
2364   demux->need_newsegment = TRUE;
2365   gst_ts_demux_flush_streams (demux);
2366 }
2367
2368 static GstFlowReturn
2369 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
2370     MpegTSPacketizerSection * section)
2371 {
2372   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
2373   TSDemuxStream *stream = NULL;
2374   GstFlowReturn res = GST_FLOW_OK;
2375
2376   if (G_LIKELY (demux->program)) {
2377     stream = (TSDemuxStream *) demux->program->streams[packet->pid];
2378
2379     if (stream) {
2380       res = gst_ts_demux_handle_packet (demux, stream, packet, section);
2381     } else if (packet->buffer)
2382       gst_buffer_unref (packet->buffer);
2383   } else {
2384     if (packet->buffer)
2385       gst_buffer_unref (packet->buffer);
2386   }
2387   return res;
2388 }
2389
2390 gboolean
2391 gst_ts_demux_plugin_init (GstPlugin * plugin)
2392 {
2393   GST_DEBUG_CATEGORY_INIT (ts_demux_debug, "tsdemux", 0,
2394       "MPEG transport stream demuxer");
2395   init_pes_parser ();
2396
2397   return gst_element_register (plugin, "tsdemux",
2398       GST_RANK_SECONDARY, GST_TYPE_TS_DEMUX);
2399 }