3ac82bd8b8ff91331531f6670b86cfb876d65b34
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / mpegtsdemux / tsdemux.c
1 /*
2  * tsdemux.c
3  * Copyright (C) 2009 Zaheer Abbas Merali
4  *               2010 Edward Hervey
5  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
6  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
7  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
8  *  Author: Edward Hervey <bilboed@bilboed.com>, Collabora Ltd.
9  *
10  * Authors:
11  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
12  *   Edward Hervey <edward.hervey@collabora.co.uk>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #include <stdlib.h>
35 #include <string.h>
36
37 #include <glib.h>
38 #include <gst/tag/tag.h>
39 #include <gst/pbutils/pbutils.h>
40 #include <gst/base/base.h>
41 #include <gst/audio/audio.h>
42
43 #include "mpegtsbase.h"
44 #include "tsdemux.h"
45 #include "gstmpegdesc.h"
46 #include "gstmpegdefs.h"
47 #include "mpegtspacketizer.h"
48 #include "pesparse.h"
49 #include <gst/codecparsers/gsth264parser.h>
50 #include <gst/codecparsers/gstmpegvideoparser.h>
51 #include <gst/video/video-color.h>
52
53 #include <math.h>
54
55 #define _gst_log2(x) (log(x)/log(2))
56
57 /*
58  * tsdemux
59  *
60  * See TODO for explanations on improvements needed
61  */
62
63 #define CONTINUITY_UNSET 255
64 #define MAX_CONTINUITY 15
65
66 /* Seeking/Scanning related variables */
67
68 /* seek to SEEK_TIMESTAMP_OFFSET before the desired offset and search then
69  * either accurately or for the next timestamp
70  */
71 #define SEEK_TIMESTAMP_OFFSET (2500 * GST_MSECOND)
72
73 #define GST_FLOW_REWINDING GST_FLOW_CUSTOM_ERROR
74
75 /* latency in msecs */
76 #define DEFAULT_LATENCY (700)
77
78 /* Limit PES packet collection to a maximum of 32MB
79  * which is more than large enough to support an H264 frame at
80  * maximum profile/level/bitrate at 30fps or above.
81  * PES bigger than this limit will be output in buffers of
82  * up to this size */
83 #define MAX_PES_PAYLOAD (32 * 1024 * 1024)
84
85 GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
86 #define GST_CAT_DEFAULT ts_demux_debug
87
88 #define ABSDIFF(a,b) (((a) > (b)) ? ((a) - (b)) : ((b) - (a)))
89
90 static GQuark QUARK_TSDEMUX;
91 static GQuark QUARK_PID;
92 static GQuark QUARK_PCR;
93 static GQuark QUARK_OPCR;
94 static GQuark QUARK_PTS;
95 static GQuark QUARK_DTS;
96 static GQuark QUARK_OFFSET;
97
98 typedef enum
99 {
100   PENDING_PACKET_EMPTY = 0,     /* No pending packet/buffer
101                                  * Push incoming buffers to the array */
102   PENDING_PACKET_HEADER,        /* PES header needs to be parsed
103                                  * Push incoming buffers to the array */
104   PENDING_PACKET_BUFFER,        /* Currently filling up output buffer
105                                  * Push incoming buffers to the bufferlist */
106   PENDING_PACKET_DISCONT        /* Discontinuity in incoming packets
107                                  * Drop all incoming buffers */
108 } PendingPacketState;
109
110 /* Pending buffer */
111 typedef struct
112 {
113   /* The fully reconstructed buffer */
114   GstBuffer *buffer;
115
116   /* Raw PTS/DTS (in 90kHz units) */
117   guint64 pts, dts;
118 } PendingBuffer;
119
120 typedef struct _TSDemuxStream TSDemuxStream;
121
122 typedef struct _TSDemuxH264ParsingInfos TSDemuxH264ParsingInfos;
123 typedef struct _TSDemuxJP2KParsingInfos TSDemuxJP2KParsingInfos;
124 typedef struct _TSDemuxADTSParsingInfos TSDemuxADTSParsingInfos;
125
126 /* Returns TRUE if a keyframe was found */
127 typedef gboolean (*GstTsDemuxKeyFrameScanFunction) (TSDemuxStream * stream,
128     guint8 * data, const gsize data_size, const gsize max_frame_offset);
129
130 typedef struct
131 {
132   guint8 *data;
133   gsize size;
134 } SimpleBuffer;
135
136 struct _TSDemuxH264ParsingInfos
137 {
138   /* H264 parsing data */
139   GstH264NalParser *parser;
140   GstByteWriter *sps;
141   GstByteWriter *pps;
142   GstByteWriter *sei;
143   SimpleBuffer framedata;
144 };
145
146 struct _TSDemuxJP2KParsingInfos
147 {
148   /* J2K parsing data */
149   gboolean interlace;
150 };
151
152 struct _TSDemuxADTSParsingInfos
153 {
154   guint mpegversion;
155 };
156
157 struct _TSDemuxStream
158 {
159   MpegTSBaseStream stream;
160
161   GstPad *pad;
162
163   /* Whether the pad was added or not */
164   gboolean active;
165
166   /* Whether this is a sparse stream (subtitles or metadata) */
167   gboolean sparse;
168
169   /* TRUE if we are waiting for a valid timestamp */
170   gboolean pending_ts;
171
172   /* Output data */
173   PendingPacketState state;
174
175   /* PES header being reconstructed (optional, allocated) */
176   guint8 *pending_header_data;
177   guint pending_header_size;
178
179   /* Data being reconstructed (allocated) */
180   guint8 *data;
181
182   /* Size of data being reconstructed (if known, else 0) */
183   guint expected_size;
184
185   /* Amount of bytes in current ->data */
186   guint current_size;
187   /* Size of ->data */
188   guint allocated_size;
189
190   /* Current PTS/DTS for this stream (in running time) */
191   GstClockTime pts;
192   GstClockTime dts;
193
194   /* Reference PTS used to detect gaps */
195   GstClockTime gap_ref_pts;
196   /* Number of outputted buffers */
197   guint32 nb_out_buffers;
198   /* Reference number of buffers for gaps */
199   guint32 gap_ref_buffers;
200
201   /* Current PTS/DTS for this stream (in 90kHz unit) */
202   guint64 raw_pts, raw_dts;
203
204   /* Whether this stream needs to send a newsegment */
205   gboolean need_newsegment;
206
207   /* Whether the next output buffer should be DISCONT */
208   gboolean discont;
209
210   /* The value to use when calculating the newsegment */
211   GstClockTime first_pts;
212
213   GstTagList *taglist;
214
215   gint continuity_counter;
216
217   /* List of pending buffers */
218   GList *pending;
219
220   /* if != 0, output only PES from that substream */
221   guint8 target_pes_substream;
222   gboolean needs_keyframe;
223
224   GstClockTime seeked_pts, seeked_dts;
225
226   GstTsDemuxKeyFrameScanFunction scan_function;
227   TSDemuxH264ParsingInfos h264infos;
228   TSDemuxJP2KParsingInfos jp2kInfos;
229   TSDemuxADTSParsingInfos atdsInfos;
230 };
231
232 #define VIDEO_CAPS \
233   GST_STATIC_CAPS (\
234     "video/mpeg, " \
235       "mpegversion = (int) { 1, 2, 4 }, " \
236       "systemstream = (boolean) FALSE; " \
237     "video/x-h264,stream-format=(string)byte-stream;" \
238     "video/x-h265,stream-format=(string)byte-stream;" \
239     "video/x-dirac;" \
240     "video/x-cavs;" \
241     "video/x-wmv," \
242       "wmvversion = (int) 3, " \
243       "format = (string) WVC1;" \
244       "image/x-jpc;" \
245 )
246
247 #define AUDIO_CAPS \
248   GST_STATIC_CAPS ( \
249     "audio/mpeg, " \
250       "mpegversion = (int) 1;" \
251     "audio/mpeg, " \
252       "mpegversion = (int) { 2, 4 }, " \
253       "stream-format = (string) adts; " \
254     "audio/mpeg, " \
255       "mpegversion = (int) 4, " \
256       "stream-format = (string) loas; " \
257     "audio/x-lpcm, " \
258       "width = (int) { 16, 20, 24 }, " \
259       "rate = (int) { 48000, 96000 }, " \
260       "channels = (int) [ 1, 8 ], " \
261       "dynamic_range = (int) [ 0, 255 ], " \
262       "emphasis = (boolean) { FALSE, TRUE }, " \
263       "mute = (boolean) { FALSE, TRUE }; " \
264     "audio/x-ac3; audio/x-eac3;" \
265     "audio/x-ac4;" \
266     "audio/x-dts;" \
267     "audio/x-opus;" \
268     "audio/x-private-ts-lpcm" \
269   )
270
271 /* Can also use the subpicture pads for text subtitles? */
272 #define SUBPICTURE_CAPS \
273     GST_STATIC_CAPS ("subpicture/x-pgs; subpicture/x-dvd; subpicture/x-dvb")
274
275 static GstStaticPadTemplate video_template =
276 GST_STATIC_PAD_TEMPLATE ("video_%01x_%05x", GST_PAD_SRC,
277     GST_PAD_SOMETIMES,
278     VIDEO_CAPS);
279
280 static GstStaticPadTemplate audio_template =
281 GST_STATIC_PAD_TEMPLATE ("audio_%01x_%05x",
282     GST_PAD_SRC,
283     GST_PAD_SOMETIMES,
284     AUDIO_CAPS);
285
286 static GstStaticPadTemplate subpicture_template =
287 GST_STATIC_PAD_TEMPLATE ("subpicture_%01x_%05x",
288     GST_PAD_SRC,
289     GST_PAD_SOMETIMES,
290     SUBPICTURE_CAPS);
291
292 static GstStaticPadTemplate private_template =
293 GST_STATIC_PAD_TEMPLATE ("private_%01x_%05x",
294     GST_PAD_SRC,
295     GST_PAD_SOMETIMES,
296     GST_STATIC_CAPS_ANY);
297
298 enum
299 {
300   PROP_0,
301   PROP_PROGRAM_NUMBER,
302   PROP_EMIT_STATS,
303   PROP_LATENCY,
304   PROP_SEND_SCTE35_EVENTS,
305   /* FILL ME */
306 };
307
308 /* Pad functions */
309
310
311 /* mpegtsbase methods */
312 static void
313 gst_ts_demux_update_program (MpegTSBase * base, MpegTSBaseProgram * program);
314 static void
315 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
316 static void
317 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
318 static gboolean
319 gst_ts_demux_can_remove_program (MpegTSBase * base,
320     MpegTSBaseProgram * program);
321 static void gst_ts_demux_reset (MpegTSBase * base);
322 static GstFlowReturn
323 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
324     GstMpegtsSection * section);
325 static void gst_ts_demux_flush (MpegTSBase * base, gboolean hard);
326 static GstFlowReturn gst_ts_demux_drain (MpegTSBase * base);
327 static gboolean
328 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream,
329     MpegTSBaseProgram * program);
330 static void
331 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream);
332 static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event);
333 static void gst_ts_demux_set_property (GObject * object, guint prop_id,
334     const GValue * value, GParamSpec * pspec);
335 static void gst_ts_demux_get_property (GObject * object, guint prop_id,
336     GValue * value, GParamSpec * pspec);
337 static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux, gboolean hard);
338 static GstFlowReturn
339 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream,
340     MpegTSBaseProgram * program);
341 static void gst_ts_demux_stream_flush (TSDemuxStream * stream,
342     GstTSDemux * demux, gboolean hard);
343
344 static gboolean push_event (MpegTSBase * base, GstEvent * event);
345 static gboolean sink_query (MpegTSBase * base, GstQuery * query);
346 static void gst_ts_demux_check_and_sync_streams (GstTSDemux * demux,
347     GstClockTime time);
348 static void handle_psi (MpegTSBase * base, GstMpegtsSection * section);
349
350 static void
351 _extra_init (void)
352 {
353   QUARK_TSDEMUX = g_quark_from_string ("tsdemux");
354   QUARK_PID = g_quark_from_string ("pid");
355   QUARK_PCR = g_quark_from_string ("pcr");
356   QUARK_OPCR = g_quark_from_string ("opcr");
357   QUARK_PTS = g_quark_from_string ("pts");
358   QUARK_DTS = g_quark_from_string ("dts");
359   QUARK_OFFSET = g_quark_from_string ("offset");
360 }
361
362 #define gst_ts_demux_parent_class parent_class
363 G_DEFINE_TYPE_WITH_CODE (GstTSDemux, gst_ts_demux, GST_TYPE_MPEGTS_BASE,
364     _extra_init ());
365 #define _do_element_init \
366   GST_DEBUG_CATEGORY_INIT (ts_demux_debug, "tsdemux", 0, \
367       "MPEG transport stream demuxer");\
368   init_pes_parser ();
369 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tsdemux, "tsdemux",
370     GST_RANK_PRIMARY, GST_TYPE_TS_DEMUX, _do_element_init);
371
372 static void
373 gst_ts_demux_dispose (GObject * object)
374 {
375   GstTSDemux *demux = GST_TS_DEMUX_CAST (object);
376
377   gst_flow_combiner_free (demux->flowcombiner);
378
379   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
380 }
381
382 static void
383 gst_ts_demux_finalize (GObject * object)
384 {
385   GstTSDemux *demux = GST_TS_DEMUX_CAST (object);
386
387   gst_event_replace (&demux->segment_event, NULL);
388   g_mutex_clear (&demux->lock);
389
390   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
391 }
392
393 static void
394 gst_ts_demux_class_init (GstTSDemuxClass * klass)
395 {
396   GObjectClass *gobject_class;
397   GstElementClass *element_class;
398   MpegTSBaseClass *ts_class;
399
400   gobject_class = G_OBJECT_CLASS (klass);
401   gobject_class->set_property = gst_ts_demux_set_property;
402   gobject_class->get_property = gst_ts_demux_get_property;
403   gobject_class->dispose = gst_ts_demux_dispose;
404   gobject_class->finalize = gst_ts_demux_finalize;
405
406   g_object_class_install_property (gobject_class, PROP_PROGRAM_NUMBER,
407       g_param_spec_int ("program-number", "Program number",
408           "Program Number to demux for (-1 to ignore)", -1, G_MAXINT,
409           -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410
411   g_object_class_install_property (gobject_class, PROP_EMIT_STATS,
412       g_param_spec_boolean ("emit-stats", "Emit statistics",
413           "Emit messages for every pcr/opcr/pts/dts", FALSE,
414           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
415
416   /**
417    * tsdemux:send-scte35-events:
418    *
419    * Whether SCTE 35 sections should be forwarded as events.
420    *
421    * When forwarding those, potential splice times are converted
422    * to running time, and can be used by a downstream muxer to reinject
423    * the sections.
424    *
425    * Since: 1.20
426    */
427   g_object_class_install_property (gobject_class, PROP_SEND_SCTE35_EVENTS,
428       g_param_spec_boolean ("send-scte35-events", "Send SCTE 35 events",
429           "Whether SCTE 35 sections should be forwarded as events", FALSE,
430           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431
432   g_object_class_install_property (gobject_class, PROP_LATENCY,
433       g_param_spec_int ("latency", "Latency",
434           "Latency to add for smooth demuxing (in ms)", -1,
435           G_MAXINT, DEFAULT_LATENCY,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   element_class = GST_ELEMENT_CLASS (klass);
439   gst_element_class_add_pad_template (element_class,
440       gst_static_pad_template_get (&video_template));
441   gst_element_class_add_pad_template (element_class,
442       gst_static_pad_template_get (&audio_template));
443   gst_element_class_add_pad_template (element_class,
444       gst_static_pad_template_get (&subpicture_template));
445   gst_element_class_add_pad_template (element_class,
446       gst_static_pad_template_get (&private_template));
447
448   gst_element_class_set_static_metadata (element_class,
449       "MPEG transport stream demuxer",
450       "Codec/Demuxer",
451       "Demuxes MPEG2 transport streams",
452       "Zaheer Abbas Merali <zaheerabbas at merali dot org>\n"
453       "Edward Hervey <edward.hervey@collabora.co.uk>");
454
455   ts_class = GST_MPEGTS_BASE_CLASS (klass);
456   ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset);
457   ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push);
458   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
459   ts_class->handle_psi = GST_DEBUG_FUNCPTR (handle_psi);
460   ts_class->sink_query = GST_DEBUG_FUNCPTR (sink_query);
461   ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started);
462   ts_class->program_stopped = GST_DEBUG_FUNCPTR (gst_ts_demux_program_stopped);
463   ts_class->update_program = GST_DEBUG_FUNCPTR (gst_ts_demux_update_program);
464   ts_class->can_remove_program = gst_ts_demux_can_remove_program;
465   ts_class->stream_added = gst_ts_demux_stream_added;
466   ts_class->stream_removed = gst_ts_demux_stream_removed;
467   ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek);
468   ts_class->flush = GST_DEBUG_FUNCPTR (gst_ts_demux_flush);
469   ts_class->drain = GST_DEBUG_FUNCPTR (gst_ts_demux_drain);
470 }
471
472 static void
473 gst_ts_demux_reset (MpegTSBase * base)
474 {
475   GstTSDemux *demux = (GstTSDemux *) base;
476
477   demux->rate = 1.0;
478   g_mutex_lock (&demux->lock);
479   gst_event_replace (&demux->segment_event, NULL);
480   g_mutex_unlock (&demux->lock);
481
482   if (demux->global_tags) {
483     gst_tag_list_unref (demux->global_tags);
484     demux->global_tags = NULL;
485   }
486
487   if (demux->previous_program) {
488     mpegts_base_deactivate_and_free_program (base, demux->previous_program);
489     demux->previous_program = NULL;
490   }
491
492   demux->have_group_id = FALSE;
493   demux->group_id = G_MAXUINT;
494
495   demux->last_seek_offset = -1;
496   demux->program_generation = 0;
497
498   demux->mpeg_pts_offset = 0;
499 }
500
501 static void
502 gst_ts_demux_init (GstTSDemux * demux)
503 {
504   MpegTSBase *base = (MpegTSBase *) demux;
505
506   base->stream_size = sizeof (TSDemuxStream);
507   base->parse_private_sections = TRUE;
508   /* We are not interested in sections (all handled by mpegtsbase) */
509   base->push_section = FALSE;
510
511   demux->flowcombiner = gst_flow_combiner_new ();
512   demux->requested_program_number = -1;
513   demux->program_number = -1;
514   demux->latency = DEFAULT_LATENCY;
515   gst_ts_demux_reset (base);
516
517   g_mutex_init (&demux->lock);
518 }
519
520
521 static void
522 gst_ts_demux_set_property (GObject * object, guint prop_id,
523     const GValue * value, GParamSpec * pspec)
524 {
525   GstTSDemux *demux = GST_TS_DEMUX (object);
526
527   switch (prop_id) {
528     case PROP_PROGRAM_NUMBER:
529       /* FIXME: do something if program is switched as opposed to set at
530        * beginning */
531       demux->requested_program_number = g_value_get_int (value);
532       break;
533     case PROP_EMIT_STATS:
534       demux->emit_statistics = g_value_get_boolean (value);
535       break;
536     case PROP_SEND_SCTE35_EVENTS:
537       demux->send_scte35_events = g_value_get_boolean (value);
538       break;
539     case PROP_LATENCY:
540       demux->latency = g_value_get_int (value);
541       break;
542     default:
543       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
544   }
545 }
546
547 static void
548 gst_ts_demux_get_property (GObject * object, guint prop_id,
549     GValue * value, GParamSpec * pspec)
550 {
551   GstTSDemux *demux = GST_TS_DEMUX (object);
552
553   switch (prop_id) {
554     case PROP_PROGRAM_NUMBER:
555       g_value_set_int (value, demux->requested_program_number);
556       break;
557     case PROP_EMIT_STATS:
558       g_value_set_boolean (value, demux->emit_statistics);
559       break;
560     case PROP_SEND_SCTE35_EVENTS:
561       g_value_set_boolean (value, demux->send_scte35_events);
562       break;
563     case PROP_LATENCY:
564       g_value_set_int (value, demux->latency);
565       break;
566     default:
567       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
568   }
569 }
570
571 static gboolean
572 gst_ts_demux_get_duration (GstTSDemux * demux, GstClockTime * dur)
573 {
574   MpegTSBase *base = (MpegTSBase *) demux;
575   gboolean res = FALSE;
576   gint64 val;
577
578   if (!demux->program) {
579     GST_DEBUG_OBJECT (demux, "No active program yet, can't provide duration");
580     return FALSE;
581   }
582
583   /* Get total size in bytes */
584   if (gst_pad_peer_query_duration (base->sinkpad, GST_FORMAT_BYTES, &val)) {
585     /* Convert it to duration */
586     *dur =
587         mpegts_packetizer_offset_to_ts (base->packetizer, val,
588         demux->program->pcr_pid);
589     if (GST_CLOCK_TIME_IS_VALID (*dur))
590       res = TRUE;
591   }
592   return res;
593 }
594
595 static gboolean
596 gst_ts_demux_srcpad_query (GstPad * pad, GstObject * parent, GstQuery * query)
597 {
598   gboolean res = TRUE;
599   GstFormat format;
600   GstTSDemux *demux;
601   MpegTSBase *base;
602
603   demux = GST_TS_DEMUX (parent);
604   base = GST_MPEGTS_BASE (demux);
605
606   switch (GST_QUERY_TYPE (query)) {
607     case GST_QUERY_DURATION:
608     {
609       GST_DEBUG_OBJECT (pad, "query duration");
610       gst_query_parse_duration (query, &format, NULL);
611       if (format == GST_FORMAT_TIME) {
612         if (!gst_pad_peer_query (base->sinkpad, query)) {
613           GstClockTime dur;
614           if (gst_ts_demux_get_duration (demux, &dur))
615             gst_query_set_duration (query, GST_FORMAT_TIME, dur);
616           else
617             res = FALSE;
618         }
619       } else {
620         GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
621         res = FALSE;
622       }
623       break;
624     }
625     case GST_QUERY_LATENCY:
626     {
627       GST_DEBUG_OBJECT (pad, "query latency");
628       res = gst_pad_peer_query (base->sinkpad, query);
629       if (res) {
630         GstClockTime min_lat, max_lat;
631         gboolean live;
632         gint latency;
633
634         /* According to H.222.0
635            Annex D.0.3 (System Time Clock recovery in the decoder)
636            and D.0.2 (Audio and video presentation synchronization)
637
638            We can end up with an interval of up to 700ms between valid
639            PTS/DTS. We therefore allow a latency of 700ms for that.
640          */
641         latency = demux->latency;
642         if (latency < 0)
643           latency = 700;
644         gst_query_parse_latency (query, &live, &min_lat, &max_lat);
645         min_lat += latency * GST_MSECOND;
646         if (GST_CLOCK_TIME_IS_VALID (max_lat))
647           max_lat += latency * GST_MSECOND;
648         gst_query_set_latency (query, live, min_lat, max_lat);
649       }
650       break;
651     }
652     case GST_QUERY_SEEKING:
653     {
654       GST_DEBUG_OBJECT (pad, "query seeking");
655       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
656       GST_DEBUG_OBJECT (pad, "asked for format %s",
657           gst_format_get_name (format));
658       if (format == GST_FORMAT_TIME) {
659         gboolean seekable = FALSE;
660
661         if (gst_pad_peer_query (base->sinkpad, query))
662           gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
663
664         /* If upstream is not seekable in TIME format we use
665          * our own values here */
666         if (!seekable) {
667           GstClockTime dur;
668           if (gst_ts_demux_get_duration (demux, &dur)) {
669             gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0, dur);
670             GST_DEBUG_OBJECT (pad, "Gave duration: %" GST_TIME_FORMAT,
671                 GST_TIME_ARGS (dur));
672           }
673         }
674       } else {
675         GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking");
676         res = FALSE;
677       }
678       break;
679     }
680     case GST_QUERY_SEGMENT:{
681       GstFormat format;
682       gint64 start, stop;
683
684       format = base->out_segment.format;
685
686       start =
687           gst_segment_to_stream_time (&base->out_segment, format,
688           base->out_segment.start);
689       if ((stop = base->out_segment.stop) == -1)
690         stop = base->out_segment.duration;
691       else
692         stop = gst_segment_to_stream_time (&base->out_segment, format, stop);
693
694       gst_query_set_segment (query, base->out_segment.rate, format, start,
695           stop);
696       res = TRUE;
697       break;
698     }
699     default:
700       res = gst_pad_query_default (pad, parent, query);
701   }
702
703   return res;
704
705 }
706
707 static void
708 clear_simple_buffer (SimpleBuffer * sbuf)
709 {
710   if (!sbuf->data)
711     return;
712
713   g_free (sbuf->data);
714   sbuf->size = 0;
715   sbuf->data = NULL;
716 }
717
718 static gboolean
719 scan_keyframe_h264 (TSDemuxStream * stream, const guint8 * data,
720     const gsize data_size, const gsize max_frame_offset)
721 {
722   gint offset = 0;
723   GstH264NalUnit unit, frame_unit = { 0, };
724   GstH264ParserResult res = GST_H264_PARSER_OK;
725   TSDemuxH264ParsingInfos *h264infos = &stream->h264infos;
726
727   GstH264NalParser *parser = h264infos->parser;
728
729   if (G_UNLIKELY (parser == NULL)) {
730     parser = h264infos->parser = gst_h264_nal_parser_new ();
731     h264infos->sps = gst_byte_writer_new ();
732     h264infos->pps = gst_byte_writer_new ();
733     h264infos->sei = gst_byte_writer_new ();
734   }
735
736   while (res == GST_H264_PARSER_OK) {
737     res =
738         gst_h264_parser_identify_nalu (parser, data, offset, data_size, &unit);
739
740     if (res != GST_H264_PARSER_OK && res != GST_H264_PARSER_NO_NAL_END) {
741       GST_INFO_OBJECT (stream->pad, "Error identifying nalu: %i", res);
742       break;
743     }
744
745     res = gst_h264_parser_parse_nal (parser, &unit);
746     if (res != GST_H264_PARSER_OK) {
747       break;
748     }
749
750     switch (unit.type) {
751       case GST_H264_NAL_SEI:
752         if (frame_unit.size)
753           break;
754
755         if (gst_byte_writer_put_data (h264infos->sei,
756                 unit.data + unit.sc_offset,
757                 unit.size + unit.offset - unit.sc_offset)) {
758           GST_DEBUG_OBJECT (stream->pad, "adding SEI %u",
759               unit.size + unit.offset - unit.sc_offset);
760         } else {
761           GST_WARNING_OBJECT (stream->pad, "Could not write SEI");
762         }
763         break;
764       case GST_H264_NAL_PPS:
765         if (frame_unit.size)
766           break;
767
768         if (gst_byte_writer_put_data (h264infos->pps,
769                 unit.data + unit.sc_offset,
770                 unit.size + unit.offset - unit.sc_offset)) {
771           GST_DEBUG_OBJECT (stream->pad, "adding PPS %u",
772               unit.size + unit.offset - unit.sc_offset);
773         } else {
774           GST_WARNING_OBJECT (stream->pad, "Could not write PPS");
775         }
776         break;
777       case GST_H264_NAL_SPS:
778         if (frame_unit.size)
779           break;
780
781         if (gst_byte_writer_put_data (h264infos->sps,
782                 unit.data + unit.sc_offset,
783                 unit.size + unit.offset - unit.sc_offset)) {
784           GST_DEBUG_OBJECT (stream->pad, "adding SPS %u",
785               unit.size + unit.offset - unit.sc_offset);
786         } else {
787           GST_WARNING_OBJECT (stream->pad, "Could not write SPS");
788         }
789         break;
790         /* these units are considered keyframes in h264parse */
791       case GST_H264_NAL_SLICE:
792       case GST_H264_NAL_SLICE_DPA:
793       case GST_H264_NAL_SLICE_DPB:
794       case GST_H264_NAL_SLICE_DPC:
795       case GST_H264_NAL_SLICE_IDR:
796       {
797         GstH264SliceHdr slice;
798
799         if (h264infos->framedata.size)
800           break;
801
802         res = gst_h264_parser_parse_slice_hdr (parser, &unit, &slice,
803             FALSE, FALSE);
804
805         if (GST_H264_IS_I_SLICE (&slice) || GST_H264_IS_SI_SLICE (&slice)) {
806           if (*(unit.data + unit.offset + 1) & 0x80) {
807             /* means first_mb_in_slice == 0 */
808             /* real frame data */
809             GST_DEBUG_OBJECT (stream->pad, "Found keyframe at: %u",
810                 unit.sc_offset);
811             frame_unit = unit;
812           }
813         }
814
815         break;
816       }
817       default:
818         break;
819     }
820
821     if (offset == unit.sc_offset + unit.size)
822       break;
823
824     offset = unit.sc_offset + unit.size;
825   }
826
827   /* We've got all the infos we need (SPS / PPS and a keyframe, plus
828    * and possibly SEI units. We can stop rewinding the stream
829    */
830   if (gst_byte_writer_get_size (h264infos->sps) &&
831       gst_byte_writer_get_size (h264infos->pps) &&
832       (h264infos->framedata.size || frame_unit.size)) {
833     guint8 *data = NULL;
834
835     gsize tmpsize = gst_byte_writer_get_size (h264infos->pps);
836
837     /*  We know that the SPS is first so just put all our data in there */
838     data = gst_byte_writer_reset_and_get_data (h264infos->pps);
839     gst_byte_writer_put_data (h264infos->sps, data, tmpsize);
840     g_free (data);
841
842     tmpsize = gst_byte_writer_get_size (h264infos->sei);
843     if (tmpsize) {
844       GST_DEBUG_OBJECT (stream->pad, "Adding SEI");
845       data = gst_byte_writer_reset_and_get_data (h264infos->sei);
846       gst_byte_writer_put_data (h264infos->sps, data, tmpsize);
847       g_free (data);
848     }
849
850     if (frame_unit.size) {      /*  We found the everything in one go! */
851       GST_DEBUG_OBJECT (stream->pad, "Adding Keyframe");
852       gst_byte_writer_put_data (h264infos->sps,
853           frame_unit.data + frame_unit.sc_offset,
854           stream->current_size - frame_unit.sc_offset);
855     } else {
856       GST_DEBUG_OBJECT (stream->pad, "Adding Keyframe");
857       gst_byte_writer_put_data (h264infos->sps,
858           h264infos->framedata.data, h264infos->framedata.size);
859       clear_simple_buffer (&h264infos->framedata);
860     }
861
862     g_free (stream->data);
863     stream->current_size = gst_byte_writer_get_size (h264infos->sps);
864     stream->data = gst_byte_writer_reset_and_get_data (h264infos->sps);
865     gst_byte_writer_init (h264infos->sps);
866     gst_byte_writer_init (h264infos->pps);
867     gst_byte_writer_init (h264infos->sei);
868
869     return TRUE;
870   }
871
872   if (frame_unit.size) {
873     GST_DEBUG_OBJECT (stream->pad, "Keep the keyframe as this is the one"
874         " we will push later");
875
876     h264infos->framedata.data =
877         g_memdup2 (frame_unit.data + frame_unit.sc_offset,
878         stream->current_size - frame_unit.sc_offset);
879     h264infos->framedata.size = stream->current_size - frame_unit.sc_offset;
880   }
881
882   return FALSE;
883 }
884
885 /* We merge data from TS packets so that the scanning methods get a continuous chunk,
886  however the scanning method will return keyframe offset which needs to be translated
887  back to actual offset in file */
888 typedef struct
889 {
890   gint64 real_offset;           /* offset of TS packet */
891   gint merged_offset;           /* offset of merged data in buffer */
892 } OffsetInfo;
893
894 static gboolean
895 gst_ts_demux_adjust_seek_offset_for_keyframe (TSDemuxStream * stream,
896     guint8 * data, guint64 size)
897 {
898   int scan_pid = -1;
899
900   if (!stream->scan_function)
901     return TRUE;
902
903   scan_pid = ((MpegTSBaseStream *) stream)->pid;
904
905   if (scan_pid != -1) {
906     return stream->scan_function (stream, data, size, size);
907   }
908
909   return TRUE;
910 }
911
912 static GstFlowReturn
913 gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
914 {
915   GList *tmp;
916
917   GstTSDemux *demux = (GstTSDemux *) base;
918   GstFlowReturn res = GST_FLOW_ERROR;
919   gdouble rate;
920   GstFormat format;
921   GstSeekFlags flags;
922   GstSeekType start_type, stop_type;
923   gint64 start, stop;
924   guint64 start_offset;
925   gboolean update = FALSE;
926   GstSegment seeksegment;
927
928   GST_DEBUG_OBJECT (demux, "seek event, %" GST_PTR_FORMAT, event);
929
930   if (base->out_segment.format == GST_FORMAT_UNDEFINED) {
931     GST_DEBUG_OBJECT (demux, "Cannot process seek event now, delaying");
932     gst_event_replace (&base->seek_event, event);
933     res = GST_FLOW_OK;
934     goto done;
935   }
936
937   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
938       &stop_type, &stop);
939
940   if (rate <= 0.0) {
941     GST_WARNING_OBJECT (demux, "Negative rate not supported");
942     goto done;
943   }
944
945   if (flags & (GST_SEEK_FLAG_SEGMENT)) {
946     GST_WARNING_OBJECT (demux, "seek flags 0x%x are not supported",
947         (int) flags);
948     goto done;
949   }
950
951   /* configure the segment with the seek variables */
952   memcpy (&seeksegment, &base->out_segment, sizeof (GstSegment));
953   GST_LOG_OBJECT (demux, "Before seek, output segment %" GST_SEGMENT_FORMAT,
954       &seeksegment);
955
956   /* record offset and rate */
957   demux->rate = rate;
958   if (!gst_segment_do_seek (&seeksegment, rate, format, flags, start_type,
959           start, stop_type, stop, &update)) {
960     GST_DEBUG_OBJECT (demux, "Seek failed in gst_segment_do_seek()");
961     goto done;
962   }
963
964   GST_DEBUG_OBJECT (demux,
965       "After seek, update %d output segment now %" GST_SEGMENT_FORMAT, update,
966       &seeksegment);
967
968   /* If the position actually changed, update == TRUE */
969   g_mutex_lock (&demux->lock);
970   if (update) {
971     GstClockTime target = seeksegment.start;
972     if (target >= SEEK_TIMESTAMP_OFFSET)
973       target -= SEEK_TIMESTAMP_OFFSET;
974     else
975       target = 0;
976
977     start_offset =
978         mpegts_packetizer_ts_to_offset (base->packetizer, target,
979         demux->program->pcr_pid);
980     if (G_UNLIKELY (start_offset == -1)) {
981       GST_WARNING_OBJECT (demux,
982           "Couldn't convert start position to an offset");
983       g_mutex_unlock (&demux->lock);
984       goto done;
985     }
986
987     base->seek_offset = start_offset;
988     demux->last_seek_offset = base->seek_offset;
989     /* Reset segment if we're not doing an accurate seek */
990     demux->reset_segment = (!(flags & GST_SEEK_FLAG_ACCURATE));
991
992     /* Clear any existing segment - it will be recalculated after streaming recommences */
993     gst_event_replace (&demux->segment_event, NULL);
994
995     for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
996       TSDemuxStream *stream = tmp->data;
997
998       if (flags & GST_SEEK_FLAG_ACCURATE)
999         stream->needs_keyframe = TRUE;
1000
1001       stream->seeked_pts = GST_CLOCK_TIME_NONE;
1002       stream->seeked_dts = GST_CLOCK_TIME_NONE;
1003       stream->first_pts = GST_CLOCK_TIME_NONE;
1004       stream->need_newsegment = TRUE;
1005     }
1006   } else {
1007     /* Position didn't change, just update the output segment based on
1008      * our new one */
1009     gst_event_take (&demux->segment_event,
1010         gst_event_new_segment (&seeksegment));
1011     if (base->last_seek_seqnum)
1012       gst_event_set_seqnum (demux->segment_event, base->last_seek_seqnum);
1013     for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1014       TSDemuxStream *stream = tmp->data;
1015       stream->need_newsegment = TRUE;
1016     }
1017   }
1018   g_mutex_unlock (&demux->lock);
1019
1020   /* Commit the new segment */
1021   memcpy (&base->out_segment, &seeksegment, sizeof (GstSegment));
1022   res = GST_FLOW_OK;
1023
1024 done:
1025   return res;
1026 }
1027
1028 static gboolean
1029 gst_ts_demux_srcpad_event (GstPad * pad, GstObject * parent, GstEvent * event)
1030 {
1031   gboolean res = TRUE;
1032   GstTSDemux *demux = GST_TS_DEMUX (parent);
1033
1034   GST_DEBUG_OBJECT (pad, "Got event %s",
1035       gst_event_type_get_name (GST_EVENT_TYPE (event)));
1036
1037   switch (GST_EVENT_TYPE (event)) {
1038     case GST_EVENT_SEEK:
1039       res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event);
1040       if (!res)
1041         GST_WARNING_OBJECT (pad, "seeking failed");
1042       gst_event_unref (event);
1043       break;
1044     default:
1045       res = gst_pad_event_default (pad, parent, event);
1046   }
1047
1048   return res;
1049 }
1050
1051 static void
1052 clean_global_taglist (GstTagList * taglist)
1053 {
1054   gst_tag_list_remove_tag (taglist, GST_TAG_CONTAINER_FORMAT);
1055   gst_tag_list_remove_tag (taglist, GST_TAG_CODEC);
1056 }
1057
1058 static gboolean
1059 push_event (MpegTSBase * base, GstEvent * event)
1060 {
1061   GstTSDemux *demux = (GstTSDemux *) base;
1062   GList *tmp;
1063   gboolean early_ret = FALSE;
1064
1065   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1066     if (base->segment.format == GST_FORMAT_TIME && base->ignore_pcr) {
1067       /* Shift start/stop values by 2s */
1068       base->packetizer->extra_shift = 2 * GST_SECOND;
1069       if (GST_CLOCK_TIME_IS_VALID (base->segment.start))
1070         base->segment.start += 2 * GST_SECOND;
1071       if (GST_CLOCK_TIME_IS_VALID (base->segment.stop))
1072         base->segment.stop += 2 * GST_SECOND;
1073       if (GST_CLOCK_TIME_IS_VALID (base->segment.position))
1074         base->segment.position += 2 * GST_SECOND;
1075     }
1076     GST_DEBUG_OBJECT (base, "Ignoring segment event (recreated later)");
1077     gst_event_unref (event);
1078     return TRUE;
1079
1080   } else if (GST_EVENT_TYPE (event) == GST_EVENT_TAG) {
1081     /* In case we receive tags before data, store them to send later
1082      * If we already have the program, send it right away */
1083     GstTagList *taglist;
1084
1085     gst_event_parse_tag (event, &taglist);
1086
1087     if (demux->global_tags == NULL) {
1088       demux->global_tags = gst_tag_list_copy (taglist);
1089
1090       /* Tags that are stream specific for the container should be considered
1091        * global for the container streams */
1092       if (gst_tag_list_get_scope (taglist) == GST_TAG_SCOPE_STREAM) {
1093         gst_tag_list_set_scope (demux->global_tags, GST_TAG_SCOPE_GLOBAL);
1094       }
1095     } else {
1096       demux->global_tags = gst_tag_list_make_writable (demux->global_tags);
1097       gst_tag_list_insert (demux->global_tags, taglist, GST_TAG_MERGE_REPLACE);
1098     }
1099     clean_global_taglist (demux->global_tags);
1100
1101     /* tags are stored to be used after if there are no streams yet,
1102      * so we should never reject */
1103     early_ret = TRUE;
1104   }
1105
1106   if (G_UNLIKELY (demux->program == NULL)) {
1107     gst_event_unref (event);
1108     return early_ret;
1109   }
1110
1111   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1112     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
1113     if (stream->pad) {
1114       /* If we are pushing out EOS, flush out pending data first */
1115       if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
1116           gst_pad_is_active (stream->pad))
1117         gst_ts_demux_push_pending_data (demux, stream, NULL);
1118
1119       gst_event_ref (event);
1120       gst_pad_push_event (stream->pad, event);
1121     }
1122   }
1123
1124   gst_event_unref (event);
1125
1126   return TRUE;
1127 }
1128
1129 static void
1130 handle_psi (MpegTSBase * base, GstMpegtsSection * section)
1131 {
1132   GstTSDemux *demux = (GstTSDemux *) base;
1133
1134   if (section->section_type == GST_MPEGTS_SECTION_SCTE_SIT) {
1135     GList *tmp;
1136     gboolean forward = FALSE;
1137
1138     if (demux->send_scte35_events) {
1139       for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1140         TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
1141
1142         if (stream->stream.pid == section->pid) {
1143           forward = TRUE;
1144           break;
1145         }
1146       }
1147     }
1148
1149     /* Create a new section to travel through the pipeline, with splice
1150      * times translated from local time to running time */
1151     if (forward) {
1152       GstEvent *event;
1153       GstStructure *s;
1154       GstStructure *rtime_map;
1155       GstClockTime pts;
1156       guint i = 0;
1157       GstMpegtsSection *new_section =
1158           (GstMpegtsSection *) gst_mini_object_copy ((GstMiniObject *) section);
1159       GstMpegtsSCTESIT *sit =
1160           (GstMpegtsSCTESIT *) gst_mpegts_section_get_scte_sit (new_section);
1161
1162       rtime_map = gst_structure_new_empty ("running-time-map");
1163
1164       if (sit->fully_parsed) {
1165         if (sit->splice_time_specified) {
1166           pts =
1167               mpegts_packetizer_pts_to_ts (base->packetizer,
1168               MPEGTIME_TO_GSTTIME (sit->splice_time +
1169                   sit->pts_adjustment), demux->program->pcr_pid);
1170           gst_structure_set (rtime_map, "splice-time", G_TYPE_UINT64,
1171               gst_segment_to_running_time (&base->out_segment, GST_FORMAT_TIME,
1172                   pts), NULL);
1173         }
1174
1175         for (i = 0; i < sit->splices->len; i++) {
1176           gchar *field_name;
1177           GstMpegtsSCTESpliceEvent *sevent =
1178               g_ptr_array_index (sit->splices, i);
1179
1180           if (sevent->program_splice_time_specified) {
1181             pts =
1182                 mpegts_packetizer_pts_to_ts_unchecked (base->packetizer,
1183                 MPEGTIME_TO_GSTTIME (sevent->program_splice_time +
1184                     sit->pts_adjustment), demux->program->pcr_pid);
1185             field_name =
1186                 g_strdup_printf ("event-%u-splice-time",
1187                 sevent->splice_event_id);
1188             gst_structure_set (rtime_map, field_name, G_TYPE_UINT64,
1189                 gst_segment_to_running_time (&base->out_segment,
1190                     GST_FORMAT_TIME, pts), NULL);
1191             g_free (field_name);
1192           }
1193         }
1194       }
1195
1196       event = gst_event_new_mpegts_section (new_section);
1197       gst_mpegts_section_unref (new_section);
1198
1199       s = gst_event_writable_structure (event);
1200       gst_structure_set (s, "mpeg-pts-offset", G_TYPE_UINT64,
1201           demux->mpeg_pts_offset, "running-time-map", GST_TYPE_STRUCTURE,
1202           rtime_map, NULL);
1203       gst_structure_free (rtime_map);
1204
1205       push_event (base, event);
1206     }
1207   }
1208 }
1209
1210 static gboolean
1211 sink_query (MpegTSBase * base, GstQuery * query)
1212 {
1213   GstTSDemux *demux = (GstTSDemux *) base;
1214   gboolean res = FALSE;
1215
1216   switch (GST_QUERY_TYPE (query)) {
1217     case GST_QUERY_BITRATE:{
1218       gint64 size_bytes;
1219       GstClockTime duration;
1220
1221       if (gst_pad_peer_query_duration (base->sinkpad, GST_FORMAT_BYTES,
1222               &size_bytes) && size_bytes > 0) {
1223         if (gst_ts_demux_get_duration (demux, &duration) && duration > 0
1224             && duration != GST_CLOCK_TIME_NONE) {
1225           guint bitrate =
1226               gst_util_uint64_scale (8 * size_bytes, GST_SECOND, duration);
1227
1228           GST_LOG_OBJECT (demux, "bitrate query byte length: %" G_GINT64_FORMAT
1229               " duration %" GST_TIME_FORMAT " resulting in a bitrate of %u",
1230               size_bytes, GST_TIME_ARGS (duration), bitrate);
1231           gst_query_set_bitrate (query, bitrate);
1232           res = TRUE;
1233         }
1234       }
1235       break;
1236     }
1237     default:
1238       res = GST_MPEGTS_BASE_CLASS (parent_class)->sink_query (base, query);
1239       break;
1240   }
1241
1242   return res;
1243 }
1244
1245 static inline void
1246 add_iso639_language_to_tags (TSDemuxStream * stream, gchar * lang_code)
1247 {
1248   const gchar *lc;
1249
1250   GST_LOG_OBJECT (stream->pad, "Add language code for stream: '%s'", lang_code);
1251
1252   if (!stream->taglist)
1253     stream->taglist = gst_tag_list_new_empty ();
1254
1255   /* descriptor contains ISO 639-2 code, we want the ISO 639-1 code */
1256   lc = gst_tag_get_language_code (lang_code);
1257
1258   /* Only set tag if we have a valid one */
1259   if (lc || (lang_code[0] && lang_code[1]))
1260     gst_tag_list_add (stream->taglist, GST_TAG_MERGE_REPLACE,
1261         GST_TAG_LANGUAGE_CODE, (lc) ? lc : lang_code, NULL);
1262 }
1263
1264 static void
1265 gst_ts_demux_create_tags (TSDemuxStream * stream)
1266 {
1267   MpegTSBaseStream *bstream = (MpegTSBaseStream *) stream;
1268   const GstMpegtsDescriptor *desc = NULL;
1269   int i, nb;
1270
1271   desc =
1272       mpegts_get_descriptor_from_stream (bstream,
1273       GST_MTS_DESC_ISO_639_LANGUAGE);
1274   if (desc) {
1275     gchar *lang_code;
1276
1277     nb = gst_mpegts_descriptor_parse_iso_639_language_nb (desc);
1278
1279     GST_DEBUG_OBJECT (stream->pad, "Found ISO 639 descriptor (%d entries)", nb);
1280
1281     for (i = 0; i < nb; i++)
1282       if (gst_mpegts_descriptor_parse_iso_639_language_idx (desc, i, &lang_code,
1283               NULL)) {
1284         add_iso639_language_to_tags (stream, lang_code);
1285         g_free (lang_code);
1286       }
1287
1288     return;
1289   }
1290
1291   desc =
1292       mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_DVB_SUBTITLING);
1293
1294   if (desc) {
1295     gchar *lang_code;
1296
1297     nb = gst_mpegts_descriptor_parse_dvb_subtitling_nb (desc);
1298
1299     GST_DEBUG_OBJECT (stream->pad, "Found SUBTITLING descriptor (%d entries)",
1300         nb);
1301
1302     for (i = 0; i < nb; i++)
1303       if (gst_mpegts_descriptor_parse_dvb_subtitling_idx (desc, i, &lang_code,
1304               NULL, NULL, NULL)) {
1305         add_iso639_language_to_tags (stream, lang_code);
1306         g_free (lang_code);
1307       }
1308   }
1309
1310   if (bstream->stream_type == GST_MPEGTS_STREAM_TYPE_PRIVATE_PES_PACKETS) {
1311     desc = mpegts_get_descriptor_from_stream_with_extension (bstream,
1312         GST_MTS_DESC_DVB_EXTENSION, GST_MTS_DESC_EXT_DVB_AUDIO_PRESELECTION);
1313
1314     if (desc) {
1315       GPtrArray *list;
1316       GstMpegtsAudioPreselectionDescriptor *item;
1317
1318       if (gst_mpegts_descriptor_parse_audio_preselection_list (desc, &list)) {
1319         GST_DEBUG ("Found AUDIO PRESELECTION descriptor (%d entries)",
1320             list->len);
1321
1322         for (i = 0; i < list->len; i++) {
1323           item = g_ptr_array_index (list, i);
1324           gst_mpegts_descriptor_parse_audio_preselection_dump (item);
1325
1326           if (item->language_code_present) {
1327             add_iso639_language_to_tags (stream, item->language_code);
1328             break;
1329           }
1330         }
1331         g_ptr_array_unref (list);
1332       }
1333     }
1334   }
1335 }
1336
1337 static GstPad *
1338 create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
1339     MpegTSBaseProgram * program)
1340 {
1341   GstTSDemux *demux = GST_TS_DEMUX (base);
1342   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1343   gchar *name = NULL;
1344   GstCaps *caps = NULL;
1345   GstPadTemplate *template = NULL;
1346   const GstMpegtsDescriptor *desc = NULL;
1347   GstPad *pad = NULL;
1348   gboolean sparse = FALSE;
1349   gboolean is_audio = FALSE, is_video = FALSE, is_subpicture = FALSE,
1350       is_private = FALSE;
1351
1352   gst_ts_demux_create_tags (stream);
1353
1354   GST_LOG_OBJECT (demux,
1355       "Attempting to create pad for stream 0x%04x with stream_type %d",
1356       bstream->pid, bstream->stream_type);
1357
1358   /* First handle BluRay-specific stream types since there is some overlap
1359    * between BluRay and non-BluRay streay type identifiers */
1360   if (program->registration_id == DRF_ID_HDMV) {
1361     switch (bstream->stream_type) {
1362       case ST_BD_AUDIO_AC3:
1363       {
1364         const GstMpegtsDescriptor *ac3_desc;
1365
1366         /* ATSC ac3 audio descriptor */
1367         ac3_desc =
1368             mpegts_get_descriptor_from_stream (bstream,
1369             GST_MTS_DESC_AC3_AUDIO_STREAM);
1370         if (ac3_desc && DESC_AC_AUDIO_STREAM_bsid (ac3_desc->data) != 16) {
1371           GST_LOG_OBJECT (demux, "ac3 audio");
1372           is_audio = TRUE;
1373           caps = gst_caps_new_empty_simple ("audio/x-ac3");
1374         } else {
1375           is_audio = TRUE;
1376           caps = gst_caps_new_empty_simple ("audio/x-eac3");
1377         }
1378         break;
1379       }
1380       case ST_BD_AUDIO_EAC3:
1381       case ST_BD_AUDIO_AC3_PLUS:
1382         is_audio = TRUE;
1383         caps = gst_caps_new_empty_simple ("audio/x-eac3");
1384         break;
1385       case ST_BD_AUDIO_AC3_TRUE_HD:
1386         is_audio = TRUE;
1387         caps = gst_caps_new_empty_simple ("audio/x-true-hd");
1388         stream->target_pes_substream = 0x72;
1389         break;
1390       case ST_BD_AUDIO_LPCM:
1391         is_audio = TRUE;
1392         caps = gst_caps_new_empty_simple ("audio/x-private-ts-lpcm");
1393         break;
1394       case ST_BD_PGS_SUBPICTURE:
1395         is_subpicture = TRUE;
1396         caps = gst_caps_new_empty_simple ("subpicture/x-pgs");
1397         sparse = TRUE;
1398         break;
1399       case ST_BD_AUDIO_DTS_HD:
1400       case ST_BD_AUDIO_DTS_HD_MASTER_AUDIO:
1401         is_audio = TRUE;
1402         caps = gst_caps_new_empty_simple ("audio/x-dts");
1403         stream->target_pes_substream = 0x71;
1404         break;
1405     }
1406   }
1407
1408   if (caps)
1409     goto done;
1410
1411   /* Handle non-BluRay stream types */
1412   switch (bstream->stream_type) {
1413     case GST_MPEGTS_STREAM_TYPE_VIDEO_MPEG1:
1414     case GST_MPEGTS_STREAM_TYPE_VIDEO_MPEG2:
1415     case ST_PS_VIDEO_MPEG2_DCII:
1416       /* FIXME : Use DCII registration code (ETV1 ?) to handle that special
1417        * Stream type (ST_PS_VIDEO_MPEG2_DCII) */
1418       /* FIXME : Use video descriptor (0x1) to refine caps with:
1419        * * frame_rate
1420        * * profile_and_level
1421        */
1422       GST_LOG_OBJECT (demux, "mpeg video");
1423       is_video = TRUE;
1424       caps = gst_caps_new_simple ("video/mpeg",
1425           "mpegversion", G_TYPE_INT,
1426           bstream->stream_type == GST_MPEGTS_STREAM_TYPE_VIDEO_MPEG1 ? 1 : 2,
1427           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
1428
1429       break;
1430     case GST_MPEGTS_STREAM_TYPE_AUDIO_MPEG1:
1431     case GST_MPEGTS_STREAM_TYPE_AUDIO_MPEG2:
1432       GST_LOG_OBJECT (demux, "mpeg audio");
1433       is_audio = TRUE;
1434       caps =
1435           gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 1,
1436           NULL);
1437       /* HDV is always mpeg 1 audio layer 2 */
1438       if (program->registration_id == DRF_ID_TSHV)
1439         gst_caps_set_simple (caps, "layer", G_TYPE_INT, 2, NULL);
1440       break;
1441     case GST_MPEGTS_STREAM_TYPE_PRIVATE_PES_PACKETS:
1442       GST_LOG_OBJECT (demux, "private data");
1443       /* FIXME: Move all of this into a common method (there might be other
1444        * types also, depending on registratino descriptors also
1445        */
1446
1447       desc = mpegts_get_descriptor_from_stream_with_extension (bstream,
1448           GST_MTS_DESC_DVB_EXTENSION, GST_MTS_DESC_EXT_DVB_AC4);
1449       if (desc) {
1450         GST_LOG_OBJECT (demux, "ac4 audio");
1451         is_audio = TRUE;
1452         caps = gst_caps_new_empty_simple ("audio/x-ac4");
1453         break;
1454       }
1455
1456       desc = mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_DVB_AC3);
1457       if (desc) {
1458         GST_LOG_OBJECT (demux, "ac3 audio");
1459         is_audio = TRUE;
1460         caps = gst_caps_new_empty_simple ("audio/x-ac3");
1461         break;
1462       }
1463
1464       desc =
1465           mpegts_get_descriptor_from_stream (bstream,
1466           GST_MTS_DESC_DVB_ENHANCED_AC3);
1467       if (desc) {
1468         GST_LOG_OBJECT (demux, "ac3 audio");
1469         is_audio = TRUE;
1470         caps = gst_caps_new_empty_simple ("audio/x-eac3");
1471         break;
1472       }
1473       desc =
1474           mpegts_get_descriptor_from_stream (bstream,
1475           GST_MTS_DESC_DVB_TELETEXT);
1476       if (desc) {
1477         GST_LOG_OBJECT (demux, "teletext");
1478         is_private = TRUE;
1479         caps = gst_caps_new_empty_simple ("application/x-teletext");
1480         sparse = TRUE;
1481         break;
1482       }
1483       desc =
1484           mpegts_get_descriptor_from_stream (bstream,
1485           GST_MTS_DESC_DVB_SUBTITLING);
1486       if (desc) {
1487         GST_LOG_OBJECT (demux, "subtitling");
1488         is_subpicture = TRUE;
1489         caps = gst_caps_new_empty_simple ("subpicture/x-dvb");
1490         sparse = TRUE;
1491         break;
1492       }
1493
1494       switch (bstream->registration_id) {
1495         case DRF_ID_DTS1:
1496         case DRF_ID_DTS2:
1497         case DRF_ID_DTS3:
1498           /* SMPTE registered DTS */
1499           is_private = TRUE;
1500           caps = gst_caps_new_empty_simple ("audio/x-dts");
1501           break;
1502         case DRF_ID_S302M:
1503           is_audio = TRUE;
1504           caps = gst_caps_new_empty_simple ("audio/x-smpte-302m");
1505           break;
1506         case DRF_ID_OPUS:
1507           desc = mpegts_get_descriptor_from_stream (bstream,
1508               GST_MTS_DESC_DVB_EXTENSION);
1509           if (desc != NULL && desc->tag_extension == 0x80 && desc->length >= 1) {       /* User defined (provisional Opus) */
1510             guint8 channel_config_code;
1511             GstByteReader br;
1512
1513             /* skip tag, length and tag_extension */
1514             gst_byte_reader_init (&br, desc->data + 3, desc->length - 1);
1515             channel_config_code = gst_byte_reader_get_uint8_unchecked (&br);
1516
1517             if ((channel_config_code & 0x8f) <= 8) {
1518               static const guint8 coupled_stream_counts[9] = {
1519                 1, 0, 1, 1, 2, 2, 2, 3, 3
1520               };
1521               static const guint8 channel_map_a[8][8] = {
1522                 {0},
1523                 {0, 1},
1524                 {0, 2, 1},
1525                 {0, 1, 2, 3},
1526                 {0, 4, 1, 2, 3},
1527                 {0, 4, 1, 2, 3, 5},
1528                 {0, 4, 1, 2, 3, 5, 6},
1529                 {0, 6, 1, 2, 3, 4, 5, 7},
1530               };
1531               static const guint8 channel_map_b[8][8] = {
1532                 {0},
1533                 {0, 1},
1534                 {0, 1, 2},
1535                 {0, 1, 2, 3},
1536                 {0, 1, 2, 3, 4},
1537                 {0, 1, 2, 3, 4, 5},
1538                 {0, 1, 2, 3, 4, 5, 6},
1539                 {0, 1, 2, 3, 4, 5, 6, 7},
1540               };
1541
1542               gint channels = -1, stream_count, coupled_count, mapping_family;
1543               guint8 *channel_mapping = NULL;
1544
1545               channels = channel_config_code ? (channel_config_code & 0x0f) : 2;
1546               if (channel_config_code == 0 || channel_config_code == 0x80) {
1547                 /* Dual Mono */
1548                 mapping_family = 255;
1549                 if (channel_config_code == 0) {
1550                   stream_count = 1;
1551                   coupled_count = 1;
1552                 } else {
1553                   stream_count = 2;
1554                   coupled_count = 0;
1555                 }
1556                 channel_mapping = g_new0 (guint8, channels);
1557                 memcpy (channel_mapping, &channel_map_a[1], channels);
1558               } else if (channel_config_code <= 8) {
1559                 mapping_family = (channels > 2) ? 1 : 0;
1560                 stream_count =
1561                     channel_config_code -
1562                     coupled_stream_counts[channel_config_code];
1563                 coupled_count = coupled_stream_counts[channel_config_code];
1564                 if (mapping_family != 0) {
1565                   channel_mapping = g_new0 (guint8, channels);
1566                   memcpy (channel_mapping, &channel_map_a[channels - 1],
1567                       channels);
1568                 }
1569               } else if (channel_config_code >= 0x82
1570                   && channel_config_code <= 0x88) {
1571                 mapping_family = 1;
1572                 stream_count = channels;
1573                 coupled_count = 0;
1574                 channel_mapping = g_new0 (guint8, channels);
1575                 memcpy (channel_mapping, &channel_map_b[channels - 1],
1576                     channels);
1577               } else if (channel_config_code == 0x81) {
1578                 if (gst_byte_reader_get_remaining (&br) < 2) {
1579                   GST_WARNING_OBJECT (demux,
1580                       "Invalid Opus descriptor with extended channel configuration");
1581                   channels = -1;
1582                   break;
1583                 }
1584
1585                 channels = gst_byte_reader_get_uint8_unchecked (&br);
1586                 mapping_family = gst_byte_reader_get_uint8_unchecked (&br);
1587
1588                 /* Overwrite values from above */
1589                 if (channels == 0) {
1590                   GST_WARNING_OBJECT (demux,
1591                       "Invalid Opus descriptor with extended channel configuration");
1592                   channels = -1;
1593                   break;
1594                 }
1595
1596                 if (mapping_family == 0 && channels <= 2) {
1597                   stream_count = channels - coupled_stream_counts[channels];
1598                   coupled_count = coupled_stream_counts[channels];
1599                 } else {
1600                   GstBitReader breader;
1601                   guint8 stream_count_minus_one, coupled_stream_count;
1602                   gint stream_count_minus_one_len, coupled_stream_count_len;
1603                   gint channel_mapping_len, i;
1604                   guint remaining_bytes;
1605
1606                   remaining_bytes = gst_byte_reader_get_remaining (&br);
1607                   gst_bit_reader_init (&breader,
1608                       gst_byte_reader_get_data_unchecked
1609                       (&br, remaining_bytes), remaining_bytes);
1610
1611                   stream_count_minus_one_len = ceil (_gst_log2 (channels));
1612                   if (!gst_bit_reader_get_bits_uint8 (&breader,
1613                           &stream_count_minus_one,
1614                           stream_count_minus_one_len)) {
1615                     GST_WARNING_OBJECT (demux,
1616                         "Invalid Opus descriptor with extended channel configuration");
1617                     channels = -1;
1618                     break;
1619                   }
1620
1621                   stream_count = stream_count_minus_one + 1;
1622                   coupled_stream_count_len =
1623                       ceil (_gst_log2 (stream_count_minus_one + 2));
1624
1625                   if (!gst_bit_reader_get_bits_uint8 (&breader,
1626                           &coupled_stream_count, coupled_stream_count_len)) {
1627                     GST_WARNING_OBJECT (demux,
1628                         "Invalid Opus descriptor with extended channel configuration");
1629                     channels = -1;
1630                     break;
1631                   }
1632
1633                   coupled_count = coupled_stream_count;
1634
1635                   channel_mapping_len =
1636                       ceil (_gst_log2 (stream_count_minus_one + 1 +
1637                           coupled_stream_count + 1));
1638                   channel_mapping = g_new0 (guint8, channels);
1639                   for (i = 0; i < channels; i++) {
1640                     if (!gst_bit_reader_get_bits_uint8 (&breader,
1641                             &channel_mapping[i], channel_mapping_len)) {
1642                       GST_WARNING_OBJECT (demux,
1643                           "Invalid Opus descriptor with extended channel configuration");
1644                       break;
1645                     }
1646                   }
1647
1648                   /* error above */
1649                   if (i != channels) {
1650                     channels = -1;
1651                     g_free (channel_mapping);
1652                     channel_mapping = NULL;
1653                     break;
1654                   }
1655                 }
1656               } else {
1657                 g_assert_not_reached ();
1658               }
1659
1660               if (channels != -1) {
1661                 is_audio = TRUE;
1662                 caps =
1663                     gst_codec_utils_opus_create_caps (48000, channels,
1664                     mapping_family, stream_count, coupled_count,
1665                     channel_mapping);
1666
1667                 g_free (channel_mapping);
1668               }
1669             } else {
1670               GST_WARNING_OBJECT (demux,
1671                   "unexpected channel config code 0x%02x", channel_config_code);
1672             }
1673           } else {
1674             GST_WARNING_OBJECT (demux, "Opus, but no extension descriptor");
1675           }
1676           break;
1677         case DRF_ID_HEVC:
1678           is_video = TRUE;
1679           caps = gst_caps_new_simple ("video/x-h265",
1680               "stream-format", G_TYPE_STRING, "byte-stream", NULL);
1681           break;
1682         case DRF_ID_KLVA:
1683           sparse = TRUE;
1684           is_private = TRUE;
1685           caps = gst_caps_new_simple ("meta/x-klv",
1686               "parsed", G_TYPE_BOOLEAN, TRUE, NULL);
1687           break;
1688         case DRF_ID_AC4:
1689           is_audio = TRUE;
1690           caps = gst_caps_new_empty_simple ("audio/x-ac4");
1691           break;
1692       }
1693       if (caps)
1694         break;
1695
1696       /* hack for itv hd (sid 10510, video pid 3401 */
1697       if (program->program_number == 10510 && bstream->pid == 3401) {
1698         is_video = TRUE;
1699         caps = gst_caps_new_simple ("video/x-h264",
1700             "stream-format", G_TYPE_STRING, "byte-stream", NULL);
1701       }
1702       break;
1703     case ST_HDV_AUX_V:
1704       /* FIXME : Should only be used with specific PMT registration_descriptor */
1705       /* We don't expose those streams since they're only helper streams */
1706       /* template = gst_static_pad_template_get (&private_template); */
1707       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
1708       /* caps = gst_caps_new_simple ("hdv/aux-v", NULL); */
1709       break;
1710     case ST_HDV_AUX_A:
1711       /* FIXME : Should only be used with specific PMT registration_descriptor */
1712       /* We don't expose those streams since they're only helper streams */
1713       /* template = gst_static_pad_template_get (&private_template); */
1714       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
1715       /* caps = gst_caps_new_simple ("hdv/aux-a", NULL); */
1716       break;
1717     case GST_MPEGTS_STREAM_TYPE_AUDIO_AAC_ADTS:
1718       is_audio = TRUE;
1719       /* prefer mpegversion 4 since it's more commonly supported one */
1720       caps = gst_caps_new_simple ("audio/mpeg",
1721           "mpegversion", G_TYPE_INT, 4,
1722           "stream-format", G_TYPE_STRING, "adts", NULL);
1723       /* we will set caps later once parsing adts header is done */
1724       stream->atdsInfos.mpegversion = 4;
1725       break;
1726     case GST_MPEGTS_STREAM_TYPE_AUDIO_AAC_LATM:
1727       is_audio = TRUE;
1728       caps = gst_caps_new_simple ("audio/mpeg",
1729           "mpegversion", G_TYPE_INT, 4,
1730           "stream-format", G_TYPE_STRING, "loas", NULL);
1731       break;
1732     case GST_MPEGTS_STREAM_TYPE_VIDEO_MPEG4:
1733       is_video = TRUE;
1734       caps = gst_caps_new_simple ("video/mpeg",
1735           "mpegversion", G_TYPE_INT, 4,
1736           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
1737       break;
1738     case GST_MPEGTS_STREAM_TYPE_VIDEO_H264:
1739       is_video = TRUE;
1740       caps = gst_caps_new_simple ("video/x-h264",
1741           "stream-format", G_TYPE_STRING, "byte-stream", NULL);
1742       break;
1743     case GST_MPEGTS_STREAM_TYPE_VIDEO_HEVC:
1744       is_video = TRUE;
1745       caps = gst_caps_new_simple ("video/x-h265",
1746           "stream-format", G_TYPE_STRING, "byte-stream", NULL);
1747       break;
1748     case GST_MPEGTS_STREAM_TYPE_VIDEO_JP2K:
1749       is_video = TRUE;
1750       desc =
1751           mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_J2K_VIDEO);
1752       if (desc == NULL) {
1753         caps = gst_caps_new_empty_simple ("image/x-jpc");
1754         break;
1755       } else {
1756         GstByteReader br;
1757         guint16 DEN_frame_rate = 0;
1758         guint16 NUM_frame_rate = 0;
1759         guint8 color_specification = 0;
1760         guint8 remaining_8b = 0;
1761         gboolean interlaced_video = 0;
1762         const gchar *interlace_mode = NULL;
1763         const gchar *colorspace = NULL;
1764         const gchar *colorimetry_mode = NULL;
1765         guint16 profile_and_level G_GNUC_UNUSED;
1766         guint32 horizontal_size G_GNUC_UNUSED;
1767         guint32 vertical_size G_GNUC_UNUSED;
1768         guint32 max_bit_rate G_GNUC_UNUSED;
1769         guint32 max_buffer_size G_GNUC_UNUSED;
1770         const guint desc_min_length = 24;
1771
1772         if (desc->length < desc_min_length) {
1773           GST_ERROR_OBJECT (demux,
1774               "GST_MPEGTS_STREAM_TYPE_VIDEO_JP2K: descriptor length %d too short",
1775               desc->length);
1776           return NULL;
1777         }
1778
1779         /* Skip the descriptor tag and length */
1780         gst_byte_reader_init (&br, desc->data + 2, desc->length);
1781
1782         profile_and_level = gst_byte_reader_get_uint16_be_unchecked (&br);
1783         horizontal_size = gst_byte_reader_get_uint32_be_unchecked (&br);
1784         vertical_size = gst_byte_reader_get_uint32_be_unchecked (&br);
1785         max_bit_rate = gst_byte_reader_get_uint32_be_unchecked (&br);
1786         max_buffer_size = gst_byte_reader_get_uint32_be_unchecked (&br);
1787         DEN_frame_rate = gst_byte_reader_get_uint16_be_unchecked (&br);
1788         NUM_frame_rate = gst_byte_reader_get_uint16_be_unchecked (&br);
1789         color_specification = gst_byte_reader_get_uint8_unchecked (&br);
1790         remaining_8b = gst_byte_reader_get_uint8_unchecked (&br);
1791         interlaced_video = remaining_8b & 0x40;
1792         /* we don't support demuxing interlaced at the moment */
1793         if (interlaced_video) {
1794           GST_ERROR_OBJECT (demux,
1795               "GST_MPEGTS_STREAM_TYPE_VIDEO_JP2K: interlaced video not supported");
1796           return NULL;
1797         } else {
1798           interlace_mode = "progressive";
1799           stream->jp2kInfos.interlace = FALSE;
1800         }
1801         switch (color_specification) {
1802           case GST_MPEGTSDEMUX_JPEG2000_COLORSPEC_SRGB:
1803             colorspace = "sRGB";
1804             colorimetry_mode = GST_VIDEO_COLORIMETRY_SRGB;
1805             break;
1806           case GST_MPEGTSDEMUX_JPEG2000_COLORSPEC_REC601:
1807             colorspace = "sYUV";
1808             colorimetry_mode = GST_VIDEO_COLORIMETRY_BT601;
1809             break;
1810           case GST_MPEGTSDEMUX_JPEG2000_COLORSPEC_REC709:
1811           case GST_MPEGTSDEMUX_JPEG2000_COLORSPEC_CIELUV:
1812             colorspace = "sYUV";
1813             colorimetry_mode = GST_VIDEO_COLORIMETRY_BT709;
1814             break;
1815           default:
1816             break;
1817         }
1818         caps = gst_caps_new_simple ("image/x-jpc",
1819             "framerate", GST_TYPE_FRACTION, NUM_frame_rate, DEN_frame_rate,
1820             "interlace-mode", G_TYPE_STRING, interlace_mode,
1821             "colorimetry", G_TYPE_STRING, colorimetry_mode,
1822             "colorspace", G_TYPE_STRING, colorspace, NULL);
1823       }
1824       break;
1825     case ST_VIDEO_DIRAC:
1826       if (bstream->registration_id == 0x64726163) {
1827         GST_LOG_OBJECT (demux, "dirac");
1828         /* dirac in hex */
1829         is_video = TRUE;
1830         caps = gst_caps_new_empty_simple ("video/x-dirac");
1831       }
1832       break;
1833     case ST_PRIVATE_EA:        /* Try to detect a VC1 stream */
1834     {
1835       gboolean is_vc1 = FALSE;
1836
1837       /* Note/FIXME: RP-227 specifies that the registration descriptor
1838        * for vc1 can also contain other information, such as profile,
1839        * level, alignment, buffer_size, .... */
1840       if (bstream->registration_id == DRF_ID_VC1)
1841         is_vc1 = TRUE;
1842       if (!is_vc1) {
1843         GST_WARNING_OBJECT (demux, "0xea private stream type found but "
1844             "no descriptor for VC1. Assuming plain VC1.");
1845       }
1846
1847       is_video = TRUE;
1848       caps = gst_caps_new_simple ("video/x-wmv",
1849           "wmvversion", G_TYPE_INT, 3, "format", G_TYPE_STRING, "WVC1", NULL);
1850
1851       break;
1852     }
1853     case ST_PS_AUDIO_AC3:
1854       /* DVB_ENHANCED_AC3 */
1855       desc =
1856           mpegts_get_descriptor_from_stream (bstream,
1857           GST_MTS_DESC_DVB_ENHANCED_AC3);
1858       if (desc) {
1859         is_audio = TRUE;
1860         caps = gst_caps_new_empty_simple ("audio/x-eac3");
1861         break;
1862       }
1863
1864       /* If stream has ac3 descriptor
1865        * OR program is ATSC (GA94)
1866        * OR stream registration is AC-3
1867        * then it's regular AC3 */
1868       if (bstream->registration_id == DRF_ID_AC3 ||
1869           program->registration_id == DRF_ID_GA94 ||
1870           mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_DVB_AC3)) {
1871         is_audio = TRUE;
1872         caps = gst_caps_new_empty_simple ("audio/x-ac3");
1873         break;
1874       }
1875
1876       GST_WARNING_OBJECT (demux,
1877           "AC3 stream type found but no guaranteed "
1878           "way found to differentiate between AC3 and EAC3. "
1879           "Assuming plain AC3.");
1880       is_audio = TRUE;
1881       caps = gst_caps_new_empty_simple ("audio/x-ac3");
1882       break;
1883     case ST_PS_AUDIO_EAC3:
1884     {
1885       /* ATSC_ENHANCED_AC3 */
1886       if (bstream->registration_id == DRF_ID_EAC3 ||
1887           mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_ATSC_EAC3)) {
1888         is_audio = TRUE;
1889         caps = gst_caps_new_empty_simple ("audio/x-eac3");
1890         break;
1891       }
1892
1893       GST_ELEMENT_WARNING (demux, STREAM, DEMUX,
1894           ("Assuming ATSC E-AC3 audio stream."),
1895           ("ATSC E-AC3 stream type found but no guarantee way found to "
1896               "differentiate among other standards (DVB, ISDB and etc..)"));
1897
1898       is_audio = TRUE;
1899       caps = gst_caps_new_empty_simple ("audio/x-eac3");
1900       break;
1901     }
1902     case ST_PS_AUDIO_LPCM2:
1903       is_audio = TRUE;
1904       caps = gst_caps_new_empty_simple ("audio/x-private2-lpcm");
1905       break;
1906     case ST_PS_AUDIO_DTS:
1907       is_audio = TRUE;
1908       caps = gst_caps_new_empty_simple ("audio/x-dts");
1909       break;
1910     case ST_PS_AUDIO_LPCM:
1911       is_audio = TRUE;
1912       caps = gst_caps_new_empty_simple ("audio/x-lpcm");
1913       break;
1914     case ST_PS_DVD_SUBPICTURE:
1915       is_subpicture = TRUE;
1916       caps = gst_caps_new_empty_simple ("subpicture/x-dvd");
1917       sparse = TRUE;
1918       break;
1919     case 0x42:
1920       /* hack for Chinese AVS video stream which use 0x42 as stream_id
1921        * NOTE: this is unofficial and within the ISO reserved range. */
1922       is_video = TRUE;
1923       caps = gst_caps_new_empty_simple ("video/x-cavs");
1924       break;
1925     default:
1926       GST_DEBUG_OBJECT (demux,
1927           "Non-media stream (stream_type:0x%x). Not creating pad",
1928           bstream->stream_type);
1929       break;
1930   }
1931
1932 done:
1933   if (caps) {
1934     if (is_audio) {
1935       template = gst_static_pad_template_get (&audio_template);
1936       name =
1937           g_strdup_printf ("audio_%01x_%04x", demux->program_generation,
1938           bstream->pid);
1939       gst_stream_set_stream_type (bstream->stream_object,
1940           GST_STREAM_TYPE_AUDIO);
1941     } else if (is_video) {
1942       template = gst_static_pad_template_get (&video_template);
1943       name =
1944           g_strdup_printf ("video_%01x_%04x", demux->program_generation,
1945           bstream->pid);
1946       gst_stream_set_stream_type (bstream->stream_object,
1947           GST_STREAM_TYPE_VIDEO);
1948     } else if (is_private) {
1949       template = gst_static_pad_template_get (&private_template);
1950       name =
1951           g_strdup_printf ("private_%01x_%04x", demux->program_generation,
1952           bstream->pid);
1953     } else if (is_subpicture) {
1954       template = gst_static_pad_template_get (&subpicture_template);
1955       name =
1956           g_strdup_printf ("subpicture_%01x_%04x", demux->program_generation,
1957           bstream->pid);
1958       gst_stream_set_stream_type (bstream->stream_object, GST_STREAM_TYPE_TEXT);
1959     } else
1960       g_assert_not_reached ();
1961
1962   }
1963
1964   if (template && name && caps) {
1965     GstEvent *event;
1966     const gchar *stream_id;
1967
1968     GST_LOG_OBJECT (demux,
1969         "stream:%p creating pad with name %s and caps %" GST_PTR_FORMAT,
1970         stream, name, caps);
1971     pad = gst_pad_new_from_template (template, name);
1972     gst_pad_set_active (pad, TRUE);
1973     gst_pad_use_fixed_caps (pad);
1974     stream_id = gst_stream_get_stream_id (bstream->stream_object);
1975
1976     event = gst_pad_get_sticky_event (base->sinkpad, GST_EVENT_STREAM_START, 0);
1977     if (event) {
1978       if (gst_event_parse_group_id (event, &demux->group_id))
1979         demux->have_group_id = TRUE;
1980       else
1981         demux->have_group_id = FALSE;
1982       gst_event_unref (event);
1983     } else if (!demux->have_group_id) {
1984       demux->have_group_id = TRUE;
1985       demux->group_id = gst_util_group_id_next ();
1986     }
1987     event = gst_event_new_stream_start (stream_id);
1988     gst_event_set_stream (event, bstream->stream_object);
1989     if (demux->have_group_id)
1990       gst_event_set_group_id (event, demux->group_id);
1991     if (sparse) {
1992       gst_event_set_stream_flags (event, GST_STREAM_FLAG_SPARSE);
1993       gst_stream_set_stream_flags (bstream->stream_object,
1994           GST_STREAM_FLAG_SPARSE);
1995     }
1996     stream->sparse = sparse;
1997     gst_stream_set_caps (bstream->stream_object, caps);
1998     if (!stream->taglist)
1999       stream->taglist = gst_tag_list_new_empty ();
2000     gst_pb_utils_add_codec_description_to_tag_list (stream->taglist, NULL,
2001         caps);
2002     gst_stream_set_tags (bstream->stream_object, stream->taglist);
2003
2004     gst_pad_push_event (pad, event);
2005     gst_pad_set_caps (pad, caps);
2006     gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query);
2007     gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event);
2008   }
2009
2010   g_free (name);
2011   if (template)
2012     gst_object_unref (template);
2013   if (caps)
2014     gst_caps_unref (caps);
2015
2016   return pad;
2017 }
2018
2019 static gboolean
2020 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
2021     MpegTSBaseProgram * program)
2022 {
2023   GstTSDemux *demux = (GstTSDemux *) base;
2024   TSDemuxStream *stream = (TSDemuxStream *) bstream;
2025
2026   if (!stream->pad) {
2027     /* Create the pad */
2028     if (bstream->stream_type != 0xff) {
2029       stream->pad = create_pad_for_stream (base, bstream, program);
2030       if (stream->pad)
2031         gst_flow_combiner_add_pad (demux->flowcombiner, stream->pad);
2032     }
2033
2034     if (base->mode != BASE_MODE_PUSHING
2035         && bstream->stream_type == GST_MPEGTS_STREAM_TYPE_VIDEO_H264) {
2036       stream->scan_function =
2037           (GstTsDemuxKeyFrameScanFunction) scan_keyframe_h264;
2038     } else {
2039       stream->scan_function = NULL;
2040     }
2041
2042     stream->active = FALSE;
2043
2044     stream->need_newsegment = TRUE;
2045     /* Reset segment if we're not doing an accurate seek */
2046     demux->reset_segment =
2047         (!(base->out_segment.flags & GST_SEEK_FLAG_ACCURATE));
2048     stream->needs_keyframe = FALSE;
2049     stream->discont = TRUE;
2050     stream->pts = GST_CLOCK_TIME_NONE;
2051     stream->dts = GST_CLOCK_TIME_NONE;
2052     stream->first_pts = GST_CLOCK_TIME_NONE;
2053     stream->raw_pts = -1;
2054     stream->raw_dts = -1;
2055     stream->pending_ts = TRUE;
2056     stream->nb_out_buffers = 0;
2057     stream->gap_ref_buffers = 0;
2058     stream->gap_ref_pts = GST_CLOCK_TIME_NONE;
2059     /* Only wait for a valid timestamp if we have a PCR_PID */
2060     stream->pending_ts = program->pcr_pid < 0x1fff;
2061     stream->continuity_counter = CONTINUITY_UNSET;
2062   }
2063
2064   return (stream->pad != NULL);
2065 }
2066
2067 static void
2068 tsdemux_h264_parsing_info_clear (TSDemuxH264ParsingInfos * h264infos)
2069 {
2070   clear_simple_buffer (&h264infos->framedata);
2071
2072   if (h264infos->parser) {
2073     gst_h264_nal_parser_free (h264infos->parser);
2074     gst_byte_writer_free (h264infos->sps);
2075     gst_byte_writer_free (h264infos->pps);
2076     gst_byte_writer_free (h264infos->sei);
2077   }
2078 }
2079
2080 static void
2081 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
2082 {
2083   TSDemuxStream *stream = (TSDemuxStream *) bstream;
2084
2085   if (stream->pad) {
2086     gst_flow_combiner_remove_pad (GST_TS_DEMUX_CAST (base)->flowcombiner,
2087         stream->pad);
2088     if (stream->active) {
2089
2090       if (gst_pad_is_active (stream->pad)) {
2091         /* Flush out all data */
2092         GST_DEBUG_OBJECT (stream->pad, "Flushing out pending data");
2093         gst_ts_demux_push_pending_data ((GstTSDemux *) base, stream, NULL);
2094
2095         GST_DEBUG_OBJECT (stream->pad, "Pushing out EOS");
2096         gst_pad_push_event (stream->pad, gst_event_new_eos ());
2097         gst_pad_set_active (stream->pad, FALSE);
2098       }
2099
2100       GST_DEBUG_OBJECT (stream->pad, "Removing pad");
2101       gst_element_remove_pad (GST_ELEMENT_CAST (base), stream->pad);
2102       stream->active = FALSE;
2103     } else {
2104       gst_object_unref (stream->pad);
2105     }
2106     stream->pad = NULL;
2107   }
2108
2109   gst_ts_demux_stream_flush (stream, GST_TS_DEMUX_CAST (base), TRUE);
2110
2111   if (stream->taglist != NULL) {
2112     gst_tag_list_unref (stream->taglist);
2113     stream->taglist = NULL;
2114   }
2115
2116   tsdemux_h264_parsing_info_clear (&stream->h264infos);
2117 }
2118
2119 static void
2120 activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
2121 {
2122   if (stream->pad) {
2123     GST_DEBUG_OBJECT (tsdemux, "Activating pad %s:%s for stream %p",
2124         GST_DEBUG_PAD_NAME (stream->pad), stream);
2125     gst_element_add_pad ((GstElement *) tsdemux, stream->pad);
2126     stream->active = TRUE;
2127     GST_DEBUG_OBJECT (stream->pad, "done adding pad");
2128   } else if (((MpegTSBaseStream *) stream)->stream_type != 0xff) {
2129     GST_DEBUG_OBJECT (tsdemux,
2130         "stream %p (pid 0x%04x, type:0x%02x) has no pad", stream,
2131         ((MpegTSBaseStream *) stream)->pid,
2132         ((MpegTSBaseStream *) stream)->stream_type);
2133   }
2134 }
2135
2136 static void
2137 gst_ts_demux_stream_flush (TSDemuxStream * stream, GstTSDemux * tsdemux,
2138     gboolean hard)
2139 {
2140   GST_DEBUG ("flushing stream %p", stream);
2141
2142   g_free (stream->data);
2143   stream->data = NULL;
2144   g_free (stream->pending_header_data);
2145   stream->pending_header_data = NULL;
2146   stream->pending_header_size = 0;
2147   stream->state = PENDING_PACKET_EMPTY;
2148   stream->expected_size = 0;
2149   stream->allocated_size = 0;
2150   stream->current_size = 0;
2151   stream->discont = TRUE;
2152   stream->pts = GST_CLOCK_TIME_NONE;
2153   stream->dts = GST_CLOCK_TIME_NONE;
2154   stream->raw_pts = -1;
2155   stream->raw_dts = -1;
2156   stream->pending_ts = TRUE;
2157   stream->nb_out_buffers = 0;
2158   stream->gap_ref_buffers = 0;
2159   stream->gap_ref_pts = GST_CLOCK_TIME_NONE;
2160   stream->continuity_counter = CONTINUITY_UNSET;
2161
2162   if (G_UNLIKELY (stream->pending)) {
2163     GList *tmp;
2164
2165     GST_DEBUG ("clearing pending %p", stream);
2166     for (tmp = stream->pending; tmp; tmp = tmp->next) {
2167       PendingBuffer *pend = (PendingBuffer *) tmp->data;
2168       gst_buffer_unref (pend->buffer);
2169       g_slice_free (PendingBuffer, pend);
2170     }
2171     g_list_free (stream->pending);
2172     stream->pending = NULL;
2173   }
2174
2175   if (hard) {
2176     stream->first_pts = GST_CLOCK_TIME_NONE;
2177     stream->need_newsegment = TRUE;
2178   }
2179 }
2180
2181 static void
2182 gst_ts_demux_flush_streams (GstTSDemux * demux, gboolean hard)
2183 {
2184   GList *walk;
2185   if (!demux->program)
2186     return;
2187
2188   for (walk = demux->program->stream_list; walk; walk = g_list_next (walk))
2189     gst_ts_demux_stream_flush (walk->data, demux, hard);
2190 }
2191
2192 static gboolean
2193 gst_ts_demux_can_remove_program (MpegTSBase * base, MpegTSBaseProgram * program)
2194 {
2195   GstTSDemux *demux = GST_TS_DEMUX (base);
2196
2197   /* If it's our current active program, we return FALSE, we'll deactivate it
2198    * ourselves when the next program gets activated */
2199   if (demux->program == program) {
2200     GST_DEBUG
2201         ("Attempting to remove current program, delaying until new program gets activated");
2202     demux->previous_program = program;
2203     demux->program_number = -1;
2204     return FALSE;
2205   }
2206   return TRUE;
2207 }
2208
2209 static void
2210 gst_ts_demux_update_program (MpegTSBase * base, MpegTSBaseProgram * program)
2211 {
2212   GstTSDemux *demux = GST_TS_DEMUX (base);
2213   GList *tmp;
2214
2215   GST_DEBUG ("Updating program %d", program->program_number);
2216   /* Emit collection message */
2217   gst_element_post_message ((GstElement *) base,
2218       gst_message_new_stream_collection ((GstObject *) base,
2219           program->collection));
2220
2221   /* Add all streams, then fire no-more-pads */
2222   for (tmp = program->stream_list; tmp; tmp = tmp->next) {
2223     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
2224     if (!stream->pad) {
2225       activate_pad_for_stream (demux, stream);
2226       if (stream->sparse) {
2227         /* force sending of pending sticky events which have been stored on the
2228          * pad already and which otherwise would only be sent on the first buffer
2229          * or serialized event (which means very late in case of subtitle streams),
2230          * and playsink waits for stream-start or another serialized event */
2231         GST_DEBUG_OBJECT (stream->pad, "sparse stream, pushing GAP event");
2232         gst_pad_push_event (stream->pad, gst_event_new_gap (0, 0));
2233       }
2234     }
2235     if (stream->pad)
2236       gst_pad_push_event (stream->pad,
2237           gst_event_new_stream_collection (program->collection));
2238   }
2239 }
2240
2241 static void
2242 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
2243 {
2244   GstTSDemux *demux = GST_TS_DEMUX (base);
2245
2246   GST_DEBUG ("Current program %d, new program %d requested program %d",
2247       (gint) demux->program_number, program->program_number,
2248       demux->requested_program_number);
2249
2250   if (demux->requested_program_number == program->program_number ||
2251       (demux->requested_program_number == -1 && demux->program_number == -1)) {
2252     GList *tmp;
2253     gboolean have_pads = FALSE;
2254
2255     GST_LOG ("program %d started", program->program_number);
2256     demux->program_number = program->program_number;
2257     demux->program = program;
2258
2259     /* Increment the program_generation counter */
2260     demux->program_generation = (demux->program_generation + 1) & 0xf;
2261
2262     /* Emit collection message */
2263     gst_element_post_message ((GstElement *) base,
2264         gst_message_new_stream_collection ((GstObject *) base,
2265             program->collection));
2266
2267     /* If this is not the initial program, we need to calculate
2268      * a new segment */
2269     g_mutex_lock (&demux->lock);
2270     gst_event_replace (&demux->segment_event, NULL);
2271     g_mutex_unlock (&demux->lock);
2272
2273     /* DRAIN ALL STREAMS FIRST ! */
2274     if (demux->previous_program) {
2275       GList *tmp;
2276       GST_DEBUG_OBJECT (demux, "Draining previous program");
2277       for (tmp = demux->previous_program->stream_list; tmp; tmp = tmp->next) {
2278         TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
2279         if (stream->pad)
2280           gst_ts_demux_push_pending_data (demux, stream,
2281               demux->previous_program);
2282       }
2283     }
2284
2285     /* Add all streams, then fire no-more-pads */
2286     for (tmp = program->stream_list; tmp; tmp = tmp->next) {
2287       TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
2288       activate_pad_for_stream (demux, stream);
2289       if (stream->pad)
2290         have_pads = TRUE;
2291     }
2292
2293     /* If there was a previous program, now is the time to deactivate it
2294      * and remove old pads (including pushing EOS) */
2295     if (demux->previous_program) {
2296       GST_DEBUG ("Deactivating previous program");
2297       mpegts_base_deactivate_and_free_program (base, demux->previous_program);
2298       demux->previous_program = NULL;
2299     }
2300
2301     if (!have_pads) {
2302       /* If we had no pads, this stream is likely corrupted or unsupported and
2303        * there's not much we can do at this point */
2304       GST_ELEMENT_ERROR (demux, STREAM, WRONG_TYPE,
2305           ("This stream contains no valid or supported streams."),
2306           ("activating program but got no pads"));
2307       return;
2308     }
2309
2310     /* If any of the stream is sparse, push a GAP event before anything else
2311      * This is done here, and not in activate_pad_for_stream() because pushing
2312      * a GAP event *is* considering data, and we want to ensure the (potential)
2313      * old pads are all removed before we push any data on the new ones */
2314     for (tmp = program->stream_list; tmp; tmp = tmp->next) {
2315       TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
2316       if (stream->sparse) {
2317         /* force sending of pending sticky events which have been stored on the
2318          * pad already and which otherwise would only be sent on the first buffer
2319          * or serialized event (which means very late in case of subtitle streams),
2320          * and playsink waits for stream-start or another serialized event */
2321         GST_DEBUG_OBJECT (stream->pad, "sparse stream, pushing GAP event");
2322         gst_pad_push_event (stream->pad, gst_event_new_gap (0, 0));
2323       }
2324       if (stream->pad)
2325         gst_pad_push_event (stream->pad,
2326             gst_event_new_stream_collection (program->collection));
2327     }
2328
2329     gst_element_no_more_pads ((GstElement *) demux);
2330   }
2331 }
2332
2333 static void
2334 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
2335 {
2336   GstTSDemux *demux = GST_TS_DEMUX (base);
2337
2338   if (demux->program == program) {
2339     demux->program = NULL;
2340     demux->program_number = -1;
2341   }
2342 }
2343
2344
2345 static inline void
2346 gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
2347     guint64 pts, guint64 offset)
2348 {
2349   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
2350   MpegTSBase *base = GST_MPEGTS_BASE (demux);
2351
2352   stream->raw_pts = pts;
2353   if (pts == -1) {
2354     stream->pts = GST_CLOCK_TIME_NONE;
2355     return;
2356   }
2357
2358   GST_LOG ("pid 0x%04x raw pts:%" G_GUINT64_FORMAT " at offset %"
2359       G_GUINT64_FORMAT, bs->pid, pts, offset);
2360
2361   /* Compute PTS in GstClockTime */
2362   stream->pts =
2363       mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
2364       MPEGTIME_TO_GSTTIME (pts), demux->program->pcr_pid);
2365
2366   if (base->out_segment.format == GST_FORMAT_TIME)
2367     demux->mpeg_pts_offset =
2368         (GSTTIME_TO_MPEGTIME (gst_segment_to_running_time (&base->out_segment,
2369                 GST_FORMAT_TIME, stream->pts)) - pts) & 0x1ffffffff;
2370
2371   /* Sanity check, some stream have completely different PTS vs DTS. If that
2372    * happens we only keep the DTS */
2373   if (GST_CLOCK_TIME_IS_VALID (stream->pts) &&
2374       GST_CLOCK_TIME_IS_VALID (stream->dts) &&
2375       ABSDIFF (stream->pts, stream->dts) > 5 * GST_SECOND) {
2376     GST_WARNING ("pid 0x%04x PTS %" GST_TIME_FORMAT
2377         " differs too much against DTS %" GST_TIME_FORMAT ", discarding it",
2378         bs->pid, GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (stream->dts));
2379     stream->raw_pts = stream->raw_dts;
2380     stream->pts = stream->dts;
2381     return;
2382   }
2383
2384   GST_LOG ("pid 0x%04x Stored PTS %" G_GUINT64_FORMAT, bs->pid, stream->pts);
2385
2386   if (G_UNLIKELY (demux->emit_statistics)) {
2387     GstStructure *st;
2388     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
2389     gst_structure_id_set (st,
2390         QUARK_PID, G_TYPE_UINT, bs->pid,
2391         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_PTS, G_TYPE_UINT64, pts,
2392         NULL);
2393     gst_element_post_message (GST_ELEMENT_CAST (demux),
2394         gst_message_new_element (GST_OBJECT (demux), st));
2395   }
2396 }
2397
2398 static inline void
2399 gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
2400     guint64 dts, guint64 offset)
2401 {
2402   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
2403
2404   stream->raw_dts = dts;
2405   if (dts == -1) {
2406     stream->dts = GST_CLOCK_TIME_NONE;
2407     return;
2408   }
2409
2410   GST_LOG ("pid 0x%04x raw dts:%" G_GUINT64_FORMAT " at offset %"
2411       G_GUINT64_FORMAT, bs->pid, dts, offset);
2412
2413   /* Compute DTS in GstClockTime */
2414   stream->dts =
2415       mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
2416       MPEGTIME_TO_GSTTIME (dts), demux->program->pcr_pid);
2417
2418   GST_LOG ("pid 0x%04x Stored DTS %" G_GUINT64_FORMAT, bs->pid, stream->dts);
2419
2420   if (G_UNLIKELY (demux->emit_statistics)) {
2421     GstStructure *st;
2422     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
2423     gst_structure_id_set (st,
2424         QUARK_PID, G_TYPE_UINT, bs->pid,
2425         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_DTS, G_TYPE_UINT64, dts,
2426         NULL);
2427     gst_element_post_message (GST_ELEMENT_CAST (demux),
2428         gst_message_new_element (GST_OBJECT (demux), st));
2429   }
2430 }
2431
2432 /* This is called when we haven't got a valid initial PTS/DTS on all streams */
2433 static gboolean
2434 check_pending_buffers (GstTSDemux * demux)
2435 {
2436   gboolean have_observation = FALSE;
2437   /* The biggest offset */
2438   guint64 offset = 0;
2439   GList *tmp;
2440   gboolean have_only_sparse = TRUE;
2441   gboolean exceeded_threshold = FALSE;
2442
2443   /* 0. Do we only have sparse stream */
2444   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
2445     TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
2446
2447     if (!tmpstream->sparse) {
2448       have_only_sparse = FALSE;
2449       break;
2450     }
2451   }
2452
2453   /* 1. Go over all streams */
2454   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
2455     TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
2456     /* 1.1 check if at least one stream got a valid DTS */
2457     if (have_only_sparse || !tmpstream->sparse) {
2458       if ((tmpstream->raw_dts != -1 && tmpstream->dts != GST_CLOCK_TIME_NONE) ||
2459           (tmpstream->raw_pts != -1 && tmpstream->pts != GST_CLOCK_TIME_NONE)) {
2460         have_observation = TRUE;
2461         break;
2462       }
2463       /* 1.2 Check if we exceeded the maximum threshold of pending data */
2464       if (tmpstream->pending && (tmpstream->raw_dts != -1
2465               || tmpstream->raw_pts != -1)) {
2466         PendingBuffer *pend = tmpstream->pending->data;
2467         guint64 lastval =
2468             tmpstream->raw_dts != -1 ? tmpstream->raw_dts : tmpstream->raw_pts;
2469         guint64 firstval = pend->dts != -1 ? pend->dts : pend->pts;
2470         GstClockTime dur;
2471         g_assert (firstval != -1);
2472         dur = MPEGTIME_TO_GSTTIME (lastval - firstval);
2473         GST_DEBUG_OBJECT (tmpstream->pad,
2474             "Pending content duration: %" GST_TIME_FORMAT, GST_TIME_ARGS (dur));
2475         if (dur > 500 * GST_MSECOND) {
2476           exceeded_threshold = TRUE;
2477           break;
2478         }
2479       }
2480     }
2481   }
2482
2483   if (have_observation == FALSE) {
2484     /* 2. If we don't have a valid value yet, break out */
2485     if (!exceeded_threshold)
2486       return FALSE;
2487
2488     /* Except if we've exceed the maximum amount of pending buffers, in which
2489      * case we ignore PCR from now on */
2490     GST_DEBUG_OBJECT (demux,
2491         "Saw more than 500ms of data without PCR. Ignoring PCR from now on");
2492     GST_MPEGTS_BASE (demux)->ignore_pcr = TRUE;
2493     demux->program->pcr_pid = 0x1fff;
2494     g_object_notify (G_OBJECT (demux), "ignore-pcr");
2495   }
2496
2497   /* 3. Go over all streams that have current/pending data */
2498   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
2499     TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
2500     PendingBuffer *pend;
2501     guint64 firstval, lastval, ts;
2502
2503     /* 3.1 Calculate the offset between current DTS and first DTS */
2504     if (tmpstream->pending == NULL || tmpstream->state == PENDING_PACKET_EMPTY)
2505       continue;
2506     /* If we don't have any pending data, the offset is 0 for this stream */
2507     if (tmpstream->pending == NULL)
2508       break;
2509     if (tmpstream->raw_dts != -1)
2510       lastval = tmpstream->raw_dts;
2511     else if (tmpstream->raw_pts != -1)
2512       lastval = tmpstream->raw_pts;
2513     else {
2514       GST_WARNING ("Don't have a last DTS/PTS to use for offset recalculation");
2515       continue;
2516     }
2517     pend = tmpstream->pending->data;
2518     if (pend->dts != -1)
2519       firstval = pend->dts;
2520     else if (pend->pts != -1)
2521       firstval = pend->pts;
2522     else {
2523       GST_WARNING
2524           ("Don't have a first DTS/PTS to use for offset recalculation");
2525       continue;
2526     }
2527     /* 3.2 Add to the offset the report TS for the current DTS */
2528     ts = mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
2529         MPEGTIME_TO_GSTTIME (lastval), demux->program->pcr_pid);
2530     if (ts == GST_CLOCK_TIME_NONE) {
2531       GST_WARNING ("THIS SHOULD NOT HAPPEN !");
2532       continue;
2533     }
2534     ts += MPEGTIME_TO_GSTTIME (lastval - firstval);
2535     /* 3.3 If that offset is bigger than the current offset, store it */
2536     if (ts > offset)
2537       offset = ts;
2538   }
2539
2540   GST_DEBUG ("New initial pcr_offset %" GST_TIME_FORMAT,
2541       GST_TIME_ARGS (offset));
2542
2543   /* 4. Set the offset on the packetizer */
2544   mpegts_packetizer_set_current_pcr_offset (MPEG_TS_BASE_PACKETIZER (demux),
2545       offset, demux->program->pcr_pid);
2546
2547   /* 4. Go over all streams */
2548   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
2549     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
2550
2551     stream->pending_ts = FALSE;
2552     /* 4.1 Set pending_ts for FALSE */
2553
2554     /* 4.2 Recalculate PTS/DTS (in running time) for pending data */
2555     if (stream->pending) {
2556       GList *tmp2;
2557       for (tmp2 = stream->pending; tmp2; tmp2 = tmp2->next) {
2558         PendingBuffer *pend = (PendingBuffer *) tmp2->data;
2559         if (pend->pts != -1)
2560           GST_BUFFER_PTS (pend->buffer) =
2561               mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
2562               MPEGTIME_TO_GSTTIME (pend->pts), demux->program->pcr_pid);
2563         if (pend->dts != -1)
2564           GST_BUFFER_DTS (pend->buffer) =
2565               mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
2566               MPEGTIME_TO_GSTTIME (pend->dts), demux->program->pcr_pid);
2567         /* 4.2.2 Set first_pts to TS of lowest PTS (for segment) */
2568         if (stream->first_pts == GST_CLOCK_TIME_NONE) {
2569           if (GST_BUFFER_PTS (pend->buffer) != GST_CLOCK_TIME_NONE)
2570             stream->first_pts = GST_BUFFER_PTS (pend->buffer);
2571           else if (GST_BUFFER_DTS (pend->buffer) != GST_CLOCK_TIME_NONE)
2572             stream->first_pts = GST_BUFFER_DTS (pend->buffer);
2573         }
2574       }
2575     }
2576     /* Recalculate PTS/DTS (in running time) for current data */
2577     if (stream->state != PENDING_PACKET_EMPTY) {
2578       if (stream->raw_pts != -1) {
2579         stream->pts =
2580             mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
2581             MPEGTIME_TO_GSTTIME (stream->raw_pts), demux->program->pcr_pid);
2582         if (stream->first_pts == GST_CLOCK_TIME_NONE)
2583           stream->first_pts = stream->pts;
2584       }
2585       if (stream->raw_dts != -1) {
2586         stream->dts =
2587             mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
2588             MPEGTIME_TO_GSTTIME (stream->raw_dts), demux->program->pcr_pid);
2589         if (stream->first_pts == GST_CLOCK_TIME_NONE)
2590           stream->first_pts = stream->dts;
2591       }
2592     }
2593   }
2594
2595   return TRUE;
2596 }
2597
2598 static void
2599 gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
2600     guint8 * data, guint32 length, guint64 bufferoffset)
2601 {
2602   PESHeader header;
2603   PESParsingResult parseres;
2604
2605   GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
2606
2607   if (G_UNLIKELY (stream->pending_header_data)) {
2608     /* Accumulate with previous header if present */
2609     stream->pending_header_data =
2610         g_realloc (stream->pending_header_data,
2611         stream->pending_header_size + length);
2612     memcpy (stream->pending_header_data + stream->pending_header_size, data,
2613         length);
2614     data = stream->pending_header_data;
2615     length = stream->pending_header_size + length;
2616   }
2617
2618   parseres = mpegts_parse_pes_header (data, length, &header);
2619
2620   if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE)) {
2621     /* This can happen if PES header is bigger than a packet. */
2622     if (!stream->pending_header_data)
2623       stream->pending_header_data = g_memdup2 (data, length);
2624     stream->pending_header_size = length;
2625     return;
2626   }
2627   if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
2628     GST_WARNING ("Error parsing PES header. pid: 0x%x stream_type: 0x%x",
2629         stream->stream.pid, stream->stream.stream_type);
2630     goto discont;
2631   }
2632
2633   if (stream->target_pes_substream != 0
2634       && header.stream_id_extension != stream->target_pes_substream) {
2635     GST_DEBUG ("Skipping unwanted substream");
2636     goto discont;
2637   }
2638
2639   gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset);
2640   gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset);
2641   if (G_UNLIKELY (stream->pending_ts &&
2642           (stream->pts != GST_CLOCK_TIME_NONE
2643               || stream->dts != GST_CLOCK_TIME_NONE))) {
2644     GST_DEBUG ("Got pts/dts update, rechecking all streams");
2645     check_pending_buffers (demux);
2646   } else if (stream->first_pts == GST_CLOCK_TIME_NONE) {
2647     if (GST_CLOCK_TIME_IS_VALID (stream->pts))
2648       stream->first_pts = stream->pts;
2649     else if (GST_CLOCK_TIME_IS_VALID (stream->dts))
2650       stream->first_pts = stream->dts;
2651   }
2652
2653   GST_DEBUG_OBJECT (demux,
2654       "stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT,
2655       GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (stream->dts));
2656
2657   /* Remove PES headers */
2658   GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
2659       header.header_size, header.packet_length, length);
2660   stream->expected_size = header.packet_length;
2661   if (stream->expected_size) {
2662     if (G_LIKELY (stream->expected_size > header.header_size)) {
2663       stream->expected_size -= header.header_size;
2664     } else {
2665       /* next packet will have to complete this one */
2666       GST_WARNING ("invalid header and packet size combination, empty packet");
2667       stream->expected_size = 0;
2668     }
2669   }
2670   data += header.header_size;
2671   length -= header.header_size;
2672
2673   /* Create the output buffer */
2674   if (stream->expected_size)
2675     stream->allocated_size = MAX (stream->expected_size, length);
2676   else
2677     stream->allocated_size = MAX (8192, length);
2678
2679   g_assert (stream->data == NULL);
2680   stream->data = g_malloc (stream->allocated_size);
2681   memcpy (stream->data, data, length);
2682   stream->current_size = length;
2683
2684   stream->state = PENDING_PACKET_BUFFER;
2685
2686   if (stream->pending_header_data) {
2687     g_free (stream->pending_header_data);
2688     stream->pending_header_data = NULL;
2689     stream->pending_header_size = 0;
2690   }
2691
2692   return;
2693
2694 discont:
2695   if (stream->pending_header_data) {
2696     g_free (stream->pending_header_data);
2697     stream->pending_header_data = NULL;
2698     stream->pending_header_size = 0;
2699   }
2700   stream->state = PENDING_PACKET_DISCONT;
2701   return;
2702 }
2703
2704  /* ONLY CALL THIS:
2705   * * WITH packet->payload != NULL
2706   * * WITH pending/current flushed out if beginning of new PES packet
2707   */
2708 static inline void
2709 gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
2710     MpegTSPacketizerPacket * packet)
2711 {
2712   guint8 *data;
2713   guint size;
2714   guint8 cc = FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc);
2715
2716   GST_LOG_OBJECT (demux, "pid: 0x%04x state:%d", stream->stream.pid,
2717       stream->state);
2718
2719   size = packet->data_end - packet->payload;
2720   data = packet->payload;
2721
2722   if (stream->continuity_counter == CONTINUITY_UNSET) {
2723     GST_DEBUG_OBJECT (demux, "CONTINUITY: Initialize to %d", cc);
2724   } else if ((cc == stream->continuity_counter + 1 ||
2725           (stream->continuity_counter == MAX_CONTINUITY && cc == 0))) {
2726     GST_LOG_OBJECT (demux, "CONTINUITY: Got expected %d", cc);
2727   } else {
2728     if (stream->state != PENDING_PACKET_EMPTY) {
2729       if (packet->payload_unit_start_indicator) {
2730         /* A mismatch is fatal, except if this is the beginning of a new
2731          * frame (from which we can recover) */
2732         if (G_UNLIKELY (stream->data)) {
2733           g_free (stream->data);
2734           stream->data = NULL;
2735         }
2736         if (G_UNLIKELY (stream->pending_header_data)) {
2737           g_free (stream->pending_header_data);
2738           stream->pending_header_data = NULL;
2739         }
2740         stream->state = PENDING_PACKET_HEADER;
2741       } else {
2742         gchar *pad_name = gst_pad_get_name (stream->pad);
2743         GST_ELEMENT_WARNING_WITH_DETAILS (demux, STREAM, DEMUX,
2744             ("CONTINUITY: Mismatch packet %d, stream %d (pid 0x%04x)", cc,
2745                 stream->continuity_counter, stream->stream.pid), (NULL),
2746             ("warning-type", G_TYPE_STRING, "continuity-mismatch",
2747                 "packet", G_TYPE_INT, cc,
2748                 "stream", G_TYPE_INT, stream->continuity_counter,
2749                 "pid", G_TYPE_UINT, stream->stream.pid,
2750                 "pad-name", G_TYPE_STRING, pad_name, NULL));
2751         g_free (pad_name);
2752         stream->state = PENDING_PACKET_DISCONT;
2753       }
2754     }
2755   }
2756   stream->continuity_counter = cc;
2757
2758   if (stream->state == PENDING_PACKET_EMPTY) {
2759     if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
2760       stream->state = PENDING_PACKET_DISCONT;
2761       GST_DEBUG_OBJECT (demux, "Didn't get the first packet of this PES");
2762     } else {
2763       GST_LOG_OBJECT (demux, "EMPTY=>HEADER");
2764       stream->state = PENDING_PACKET_HEADER;
2765     }
2766   }
2767
2768   switch (stream->state) {
2769     case PENDING_PACKET_HEADER:
2770     {
2771       GST_LOG_OBJECT (demux, "HEADER: Parsing PES header");
2772
2773       /* parse the header */
2774       gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset);
2775       break;
2776     }
2777     case PENDING_PACKET_BUFFER:
2778     {
2779       GST_LOG_OBJECT (demux, "BUFFER: appending data");
2780       if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) {
2781         GST_LOG_OBJECT (demux, "resizing buffer");
2782         do {
2783           stream->allocated_size = MAX (8192, 2 * stream->allocated_size);
2784         } while (stream->current_size + size > stream->allocated_size);
2785         stream->data = g_realloc (stream->data, stream->allocated_size);
2786       }
2787       memcpy (stream->data + stream->current_size, data, size);
2788       stream->current_size += size;
2789       break;
2790     }
2791     case PENDING_PACKET_DISCONT:
2792     {
2793       GST_LOG_OBJECT (demux, "DISCONT: not storing/pushing");
2794       if (G_UNLIKELY (stream->data)) {
2795         g_free (stream->data);
2796         stream->data = NULL;
2797       }
2798       if (G_UNLIKELY (stream->pending_header_data)) {
2799         g_free (stream->pending_header_data);
2800         stream->pending_header_data = NULL;
2801       }
2802       stream->continuity_counter = CONTINUITY_UNSET;
2803       break;
2804     }
2805     default:
2806       break;
2807   }
2808
2809   return;
2810 }
2811
2812 static void
2813 calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream,
2814     MpegTSBaseProgram * target_program)
2815 {
2816   MpegTSBase *base = (MpegTSBase *) demux;
2817   GstClockTime lowest_pts = GST_CLOCK_TIME_NONE;
2818   GstClockTime firstts = 0;
2819   GList *tmp;
2820
2821   GST_DEBUG_OBJECT (demux, "Creating new newsegment for stream %p", stream);
2822
2823   if (target_program == NULL)
2824     target_program = demux->program;
2825
2826   /* Speedup : if we don't need to calculate anything, go straight to pushing */
2827   g_mutex_lock (&demux->lock);
2828   if (demux->segment_event) {
2829     g_mutex_unlock (&demux->lock);
2830     goto push_new_segment;
2831   }
2832   g_mutex_unlock (&demux->lock);
2833
2834   /* Calculate the 'new_start' value, used for newsegment */
2835   for (tmp = target_program->stream_list; tmp; tmp = tmp->next) {
2836     TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
2837
2838     if (GST_CLOCK_TIME_IS_VALID (pstream->first_pts)) {
2839       if (!GST_CLOCK_TIME_IS_VALID (lowest_pts)
2840           || pstream->first_pts < lowest_pts)
2841         lowest_pts = pstream->first_pts;
2842     }
2843   }
2844   if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
2845     firstts = lowest_pts;
2846   GST_DEBUG_OBJECT (demux, "lowest_pts %" G_GUINT64_FORMAT " => clocktime %"
2847       GST_TIME_FORMAT, lowest_pts, GST_TIME_ARGS (firstts));
2848
2849   if (base->out_segment.format != GST_FORMAT_TIME || demux->reset_segment) {
2850     /* It will happen only if it's first program or after flushes. */
2851     GST_DEBUG_OBJECT (demux, "Calculating actual segment");
2852     if (base->segment.format == GST_FORMAT_TIME) {
2853       /* Try to recover segment info from base if it's in TIME format */
2854       base->out_segment = base->segment;
2855     } else {
2856       /* Start from the first ts/pts */
2857       GstSegment *seg = &base->out_segment;
2858       GstClockTime base =
2859           seg->base + seg->position - (seg->start + seg->offset);
2860       GstClockTime stop = seg->stop;
2861
2862       gst_segment_init (seg, GST_FORMAT_TIME);
2863       seg->start = firstts;
2864       seg->stop = MAX (seg->start, stop);
2865       seg->position = firstts;
2866       seg->time = firstts;
2867       seg->rate = demux->rate;
2868       seg->base = base;
2869     }
2870   } else if (base->out_segment.start < firstts) {
2871     /* Take into account the offset to the first buffer timestamp */
2872     if (base->out_segment.rate > 0) {
2873       if (GST_CLOCK_TIME_IS_VALID (base->out_segment.stop))
2874         base->out_segment.stop += firstts - base->out_segment.start;
2875       base->out_segment.start = firstts;
2876       base->out_segment.position = firstts;
2877     }
2878   }
2879
2880   GST_LOG_OBJECT (demux, "Output segment now %" GST_SEGMENT_FORMAT,
2881       &base->out_segment);
2882
2883   g_mutex_lock (&demux->lock);
2884   if (!demux->segment_event) {
2885     gst_event_take (&demux->segment_event,
2886         gst_event_new_segment (&base->out_segment));
2887
2888     if (base->last_seek_seqnum != GST_SEQNUM_INVALID)
2889       gst_event_set_seqnum (demux->segment_event, base->last_seek_seqnum);
2890   }
2891   g_mutex_unlock (&demux->lock);
2892
2893 push_new_segment:
2894   for (tmp = target_program->stream_list; tmp; tmp = tmp->next) {
2895     stream = (TSDemuxStream *) tmp->data;
2896     if (stream->pad == NULL)
2897       continue;
2898
2899     g_mutex_lock (&demux->lock);
2900     if (demux->segment_event) {
2901       GstEvent *evt = gst_event_ref (demux->segment_event);
2902       GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
2903
2904       g_mutex_unlock (&demux->lock);
2905       gst_pad_push_event (stream->pad, evt);
2906     } else {
2907       g_mutex_unlock (&demux->lock);
2908     }
2909
2910     if (demux->global_tags) {
2911       gst_pad_push_event (stream->pad,
2912           gst_event_new_tag (gst_tag_list_ref (demux->global_tags)));
2913     }
2914
2915     /* Push pending tags */
2916     if (stream->taglist) {
2917       GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT,
2918           stream->taglist);
2919       gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist));
2920       stream->taglist = NULL;
2921     }
2922
2923     stream->need_newsegment = FALSE;
2924   }
2925   if (base->seek_event) {
2926     g_assert (base->out_segment.format != GST_FORMAT_UNDEFINED);
2927     gst_ts_demux_do_seek (base, base->seek_event);
2928     gst_event_replace (&base->seek_event, NULL);
2929   }
2930 }
2931
2932 static void
2933 gst_ts_demux_check_and_sync_streams (GstTSDemux * demux, GstClockTime time)
2934 {
2935   GList *tmp;
2936
2937   GST_DEBUG_OBJECT (demux,
2938       "Recheck streams and sync to at least: %" GST_TIME_FORMAT,
2939       GST_TIME_ARGS (time));
2940
2941   if (G_UNLIKELY (demux->program == NULL))
2942     return;
2943
2944   /* Go over each stream and update it to at least 'time' time.
2945    * For each stream, the pad stores the buffer counter the last time
2946    * a gap check occurred (gap_ref_buffers) and a gap_ref_pts timestamp
2947    * that is either the PTS from the stream or the PCR the pad was updated
2948    * to.
2949    *
2950    * We can check nb_out_buffers to see if any buffers were pushed since then.
2951    * This means we can detect buffers passing without PTSes fine and still generate
2952    * gaps.
2953    *
2954    * If there haven't been any buffers pushed on this stream since the last gap
2955    * check *AND* there is no pending data (stream->current_size), push a gap
2956    * event updating to the indicated input PCR time and update the pad's
2957    * tracking.
2958    *
2959    * If there have been buffers pushed, update the reference buffer count
2960    * and but don't push a gap event
2961    */
2962   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
2963     TSDemuxStream *ps = (TSDemuxStream *) tmp->data;
2964     GST_DEBUG_OBJECT (ps->pad,
2965         "0x%04x, PTS:%" GST_TIME_FORMAT " REFPTS:%" GST_TIME_FORMAT " Gap:%"
2966         GST_TIME_FORMAT " nb_buffers: %d (ref:%d) pending_data size %u",
2967         ((MpegTSBaseStream *) ps)->pid, GST_TIME_ARGS (ps->pts),
2968         GST_TIME_ARGS (ps->gap_ref_pts),
2969         GST_TIME_ARGS (ps->pts - ps->gap_ref_pts), ps->nb_out_buffers,
2970         ps->gap_ref_buffers, ps->current_size);
2971     if (ps->pad == NULL)
2972       continue;
2973
2974     if (ps->nb_out_buffers == ps->gap_ref_buffers && ps->current_size == 0
2975         && ps->gap_ref_pts != ps->pts) {
2976       /* Do initial setup of pad if needed - segment etc */
2977       GST_DEBUG_OBJECT (ps->pad,
2978           "Stream needs update. Pushing GAP event to TS %" GST_TIME_FORMAT,
2979           GST_TIME_ARGS (time));
2980       if (G_UNLIKELY (ps->need_newsegment))
2981         calculate_and_push_newsegment (demux, ps, NULL);
2982
2983       /* Now send gap event */
2984       gst_pad_push_event (ps->pad, gst_event_new_gap (time, 0));
2985     }
2986
2987     /* Update GAP tracking vars so we don't re-check this stream for a while */
2988     ps->gap_ref_pts = time;
2989     if (ps->pts != GST_CLOCK_TIME_NONE && ps->pts > time)
2990       ps->gap_ref_pts = ps->pts;
2991     ps->gap_ref_buffers = ps->nb_out_buffers;
2992   }
2993 }
2994
2995 static GstBufferList *
2996 parse_opus_access_unit (TSDemuxStream * stream)
2997 {
2998   GstByteReader reader;
2999   GstBufferList *buffer_list = NULL;
3000
3001   buffer_list = gst_buffer_list_new ();
3002   gst_byte_reader_init (&reader, stream->data, stream->current_size);
3003
3004   do {
3005     GstBuffer *buffer;
3006     guint16 id;
3007     guint au_size = 0;
3008     guint8 b;
3009     gboolean start_trim_flag, end_trim_flag, control_extension_flag;
3010     guint16 start_trim = 0, end_trim = 0;
3011     guint8 *packet_data;
3012     guint packet_size;
3013
3014     if (!gst_byte_reader_get_uint16_be (&reader, &id))
3015       goto error;
3016
3017     /* No control header */
3018     if ((id >> 5) != 0x3ff)
3019       goto error;
3020
3021     do {
3022       if (!gst_byte_reader_get_uint8 (&reader, &b))
3023         goto error;
3024       au_size += b;
3025     } while (b == 0xff);
3026
3027     start_trim_flag = (id >> 4) & 0x1;
3028     end_trim_flag = (id >> 3) & 0x1;
3029     control_extension_flag = (id >> 2) & 0x1;
3030
3031     if (start_trim_flag) {
3032       if (!gst_byte_reader_get_uint16_be (&reader, &start_trim))
3033         goto error;
3034     }
3035
3036     if (end_trim_flag) {
3037       if (!gst_byte_reader_get_uint16_be (&reader, &end_trim))
3038         goto error;
3039     }
3040
3041     if (control_extension_flag) {
3042       if (!gst_byte_reader_get_uint8 (&reader, &b))
3043         goto error;
3044
3045       if (!gst_byte_reader_skip (&reader, b))
3046         goto error;
3047     }
3048
3049     packet_size = au_size;
3050
3051     /* FIXME: this should be
3052      *   packet_size = au_size - gst_byte_reader_get_pos (&reader);
3053      * but ffmpeg and the only available sample stream from obe.tv
3054      * are not including the control header size in au_size
3055      */
3056     if (gst_byte_reader_get_remaining (&reader) < packet_size)
3057       goto error;
3058     if (!gst_byte_reader_dup_data (&reader, packet_size, &packet_data))
3059       goto error;
3060
3061     buffer = gst_buffer_new_wrapped (packet_data, packet_size);
3062
3063     if (start_trim != 0 || end_trim != 0) {
3064       gst_buffer_add_audio_clipping_meta (buffer, GST_FORMAT_DEFAULT,
3065           start_trim, end_trim);
3066     }
3067
3068     gst_buffer_list_add (buffer_list, buffer);
3069   } while (gst_byte_reader_get_remaining (&reader) > 0);
3070
3071   g_free (stream->data);
3072   stream->data = NULL;
3073   stream->current_size = 0;
3074
3075   return buffer_list;
3076
3077 error:
3078   {
3079     GST_ERROR ("Failed to parse Opus access unit");
3080     g_free (stream->data);
3081     stream->data = NULL;
3082     stream->current_size = 0;
3083     if (buffer_list)
3084       gst_buffer_list_unref (buffer_list);
3085     return NULL;
3086   }
3087 }
3088
3089 /* interlaced mode is disabled at the moment */
3090 /*#define TSDEMUX_JP2K_SUPPORT_INTERLACE */
3091 static GstBuffer *
3092 parse_jp2k_access_unit (TSDemuxStream * stream)
3093 {
3094   GstByteReader reader;
3095   /* header tag */
3096   guint32 header_tag;
3097   /* Framerate box */
3098   guint16 den G_GNUC_UNUSED;
3099   guint16 num G_GNUC_UNUSED;
3100   /* Maximum bitrate box */
3101   guint32 MaxBr G_GNUC_UNUSED;
3102   guint32 AUF[2] = { 0, 0 };
3103 #ifdef TSDEMUX_JP2K_SUPPORT_INTERLACE
3104   /* Field Coding Box */
3105   guint8 Fic G_GNUC_UNUSED = 1;
3106   guint8 Fio G_GNUC_UNUSED = 0;
3107   /* header size equals 38 for non-interlaced, and 48 for interlaced */
3108   guint header_size = stream->jp2kInfos.interlace ? 48 : 38;
3109 #else
3110   /* header size equals 38 for non-interlaced, and 48 for interlaced */
3111   guint header_size = 38;
3112 #endif
3113   /* Time Code box */
3114   guint32 HHMMSSFF G_GNUC_UNUSED;
3115   /* Broadcast color box */
3116   guint8 CollC G_GNUC_UNUSED;
3117   guint8 b G_GNUC_UNUSED;
3118
3119   guint data_location;
3120   GstBuffer *retbuf = NULL;
3121
3122   if (stream->current_size < header_size) {
3123     GST_ERROR_OBJECT (stream->pad, "Not enough data for header");
3124     goto error;
3125   }
3126
3127   gst_byte_reader_init (&reader, stream->data, stream->current_size);
3128
3129   /* Check for the location of the jp2k magic */
3130   data_location =
3131       gst_byte_reader_masked_scan_uint32 (&reader, 0xffffffff, 0xff4fff51, 0,
3132       stream->current_size);
3133   GST_DEBUG_OBJECT (stream->pad, "data location %d", data_location);
3134   if (data_location == -1) {
3135     GST_ERROR_OBJECT (stream->pad, "Stream does not contain jp2k magic header");
3136     goto error;
3137   }
3138
3139   /* Elementary stream header box 'elsm' == 0x656c736d */
3140   header_tag = gst_byte_reader_get_uint32_be_unchecked (&reader);
3141   if (header_tag != 0x656c736d) {
3142     GST_ERROR_OBJECT (stream->pad, "Expected ELSM box but found box %x instead",
3143         header_tag);
3144     goto error;
3145   }
3146   /* Frame rate box 'frat' == 0x66726174 */
3147   header_tag = gst_byte_reader_get_uint32_be_unchecked (&reader);
3148   if (header_tag != 0x66726174) {
3149     GST_ERROR_OBJECT (stream->pad,
3150         "Expected frame rate box, but found box %x instead", header_tag);
3151     goto error;
3152
3153   }
3154   den = gst_byte_reader_get_uint16_be_unchecked (&reader);
3155   num = gst_byte_reader_get_uint16_be_unchecked (&reader);
3156   /* Maximum bit rate box 'brat' == 0x62726174 */
3157   header_tag = gst_byte_reader_get_uint32_be_unchecked (&reader);
3158   if (header_tag != 0x62726174) {
3159     GST_ERROR_OBJECT (stream->pad, "Expected brat box but read box %x instead",
3160         header_tag);
3161     goto error;
3162
3163   }
3164   MaxBr = gst_byte_reader_get_uint32_be_unchecked (&reader);
3165   AUF[0] = gst_byte_reader_get_uint32_be_unchecked (&reader);
3166   if (stream->jp2kInfos.interlace) {
3167 #ifdef TSDEMUX_JP2K_SUPPORT_INTERLACE
3168     AUF[1] = gst_byte_reader_get_uint32_be_unchecked (&reader);
3169     /*  Field Coding Box 'fiel' == 0x6669656c */
3170     header_tag = gst_byte_reader_get_uint32_be_unchecked (&reader);
3171     if (header_tag != 0x6669656c) {
3172       GST_ERROR_OBJECT (stream->pad,
3173           "Expected Field Coding box but found box %x instead", header_tag);
3174       goto error;
3175     }
3176     Fic = gst_byte_reader_get_uint8_unchecked (&reader);
3177     Fio = gst_byte_reader_get_uint8_unchecked (&reader);
3178 #else
3179     GST_ERROR_OBJECT (stream->pad, "interlaced mode not supported");
3180     goto error;
3181 #endif
3182   }
3183
3184   /* Time Code Box 'tcod' == 0x74636f64 */
3185   /* Some progressive streams might have a AUF[1] of value 0 present */
3186   header_tag = gst_byte_reader_get_uint32_be_unchecked (&reader);
3187   if (header_tag == 0 && !stream->jp2kInfos.interlace) {
3188     AUF[1] = header_tag;
3189     header_tag = gst_byte_reader_get_uint32_be_unchecked (&reader);
3190     /* Bump up header size and recheck */
3191     header_size += 4;
3192     if (stream->current_size < header_size) {
3193       GST_ERROR_OBJECT (stream->pad, "Not enough data for header");
3194       goto error;
3195     }
3196   }
3197   if (header_tag != 0x74636f64) {
3198     GST_ERROR_OBJECT (stream->pad,
3199         "Expected Time code box but found %d box instead", header_tag);
3200     goto error;
3201   }
3202   HHMMSSFF = gst_byte_reader_get_uint32_be_unchecked (&reader);
3203   /* Broadcast Color Box 'bcol' == 0x6263686c */
3204   header_tag = gst_byte_reader_get_uint32_be_unchecked (&reader);
3205   if (header_tag != 0x62636f6c) {
3206     GST_ERROR_OBJECT (stream->pad,
3207         "Expected Broadcast color box but found %x box instead", header_tag);
3208     goto error;
3209   }
3210   CollC = gst_byte_reader_get_uint8_unchecked (&reader);
3211   b = gst_byte_reader_get_uint8_unchecked (&reader);
3212
3213   /* Check if we have enough data to create a valid buffer */
3214   if ((stream->current_size - data_location) < (AUF[0] + AUF[1])) {
3215     GST_ERROR_OBJECT
3216         (stream->pad,
3217         "Required size (%d) greater than remaining size in buffer (%d)",
3218         AUF[0] + AUF[1], (stream->current_size - data_location));
3219     if (stream->expected_size && stream->current_size != stream->expected_size) {
3220       /* warn if buffer is truncated due to draining */
3221       GST_WARNING_OBJECT
3222           (stream->pad,
3223           "Truncated buffer: current size (%d) doesn't match expected size (%d)",
3224           stream->current_size, stream->expected_size);
3225     } else {
3226       /* kill pipeline if either we don't know what expected size is, or
3227        * we know the expected size, and thus are sure that the buffer is not
3228        *  truncated due to draining */
3229       goto error;
3230     }
3231   }
3232
3233   retbuf = gst_buffer_new_wrapped_full (0, stream->data, stream->current_size,
3234       data_location, stream->current_size - data_location,
3235       stream->data, g_free);
3236   stream->data = NULL;
3237   stream->current_size = 0;
3238   return retbuf;
3239
3240 error:
3241   GST_ERROR ("Failed to parse JP2K access unit");
3242   g_free (stream->data);
3243   stream->data = NULL;
3244   stream->current_size = 0;
3245   return NULL;
3246 }
3247
3248 static GstBuffer *
3249 parse_aac_adts_frame (TSDemuxStream * stream)
3250 {
3251   gint data_location = -1;
3252   guint frame_len;
3253   guint crc_size;
3254   guint mpegversion = 4;
3255   gint i;
3256
3257   if (stream->current_size < 6) {
3258     GST_DEBUG_OBJECT (stream->pad, "Not enough data for header");
3259     goto out;
3260   }
3261
3262   /* check syncword */
3263   for (i = 0; i < stream->current_size - 2; i++) {
3264     if ((stream->data[i] == 0xff) && ((stream->data[i + 1] & 0xf6) == 0xf0)) {
3265       data_location = i;
3266       break;
3267     }
3268   }
3269
3270   GST_TRACE_OBJECT (stream->pad, "data location %d", data_location);
3271
3272   if (data_location == -1) {
3273     GST_DEBUG_OBJECT (stream->pad, "Stream does not contain adts syncword");
3274     goto out;
3275   }
3276
3277   if (stream->current_size - data_location < 6) {
3278     GST_DEBUG_OBJECT (stream->pad, "Not enough data for header");
3279     goto out;
3280   }
3281
3282   frame_len = ((stream->data[data_location + 3] & 0x03) << 11) |
3283       (stream->data[data_location + 4] << 3) | ((stream->data[data_location +
3284               5] & 0xe0) >> 5);
3285
3286   crc_size = (stream->data[data_location + 1] & 0x01) ? 0 : 2;
3287
3288   if (frame_len < 7 + crc_size) {
3289     GST_DEBUG_OBJECT (stream->pad, "Invalid frame len %d", frame_len);
3290     goto out;
3291   }
3292
3293   /* this seems to be valid adts header, check mpeg version now
3294    *
3295    * TODO: check channels, rate, and profile and then update caps too?
3296    */
3297   mpegversion = (stream->data[data_location + 1] & 0x08) ? 2 : 4;
3298
3299 out:
3300   if (mpegversion != stream->atdsInfos.mpegversion) {
3301     GstCaps *caps;
3302     MpegTSBaseStream *bstream = (MpegTSBaseStream *) stream;
3303
3304     GST_DEBUG_OBJECT (stream->pad, "Update mpegversion from %d to %d",
3305         stream->atdsInfos.mpegversion, mpegversion);
3306     stream->atdsInfos.mpegversion = mpegversion;
3307
3308     caps = gst_stream_get_caps (bstream->stream_object);
3309     caps = gst_caps_make_writable (caps);
3310
3311     gst_caps_set_simple (caps, "mpegversion", G_TYPE_INT, mpegversion, NULL);
3312     gst_stream_set_caps (bstream->stream_object, caps);
3313     gst_pad_set_caps (stream->pad, caps);
3314     gst_caps_unref (caps);
3315   }
3316
3317   return gst_buffer_new_wrapped (stream->data, stream->current_size);
3318 }
3319
3320
3321 static GstFlowReturn
3322 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream,
3323     MpegTSBaseProgram * target_program)
3324 {
3325   MpegTSBase *base = GST_MPEGTS_BASE (demux);
3326   GstFlowReturn res = GST_FLOW_OK;
3327   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
3328   GstBuffer *buffer = NULL;
3329   GstBufferList *buffer_list = NULL;
3330
3331
3332   GST_DEBUG_OBJECT (stream->pad,
3333       "stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid,
3334       bs->stream_type, stream->state);
3335
3336   if (G_UNLIKELY (stream->data == NULL)) {
3337     GST_LOG_OBJECT (stream->pad, "stream->data == NULL");
3338     goto beach;
3339   }
3340
3341   if (G_UNLIKELY (stream->state == PENDING_PACKET_EMPTY)) {
3342     GST_LOG_OBJECT (stream->pad, "EMPTY: returning");
3343     goto beach;
3344   }
3345
3346   if (G_UNLIKELY (stream->state != PENDING_PACKET_BUFFER)) {
3347     GST_LOG_OBJECT (stream->pad, "state:%d, returning", stream->state);
3348     goto beach;
3349   }
3350
3351   if (G_UNLIKELY (demux->program == NULL)) {
3352     GST_LOG_OBJECT (demux, "No program");
3353     g_free (stream->data);
3354     goto beach;
3355   }
3356
3357   if (stream->needs_keyframe) {
3358     MpegTSBase *base = (MpegTSBase *) demux;
3359
3360     if ((gst_ts_demux_adjust_seek_offset_for_keyframe (stream, stream->data,
3361                 stream->current_size)) || demux->last_seek_offset == 0) {
3362       GST_DEBUG_OBJECT (stream->pad,
3363           "Got Keyframe, ready to go at %" GST_TIME_FORMAT,
3364           GST_TIME_ARGS (stream->pts));
3365
3366       if (bs->stream_type == GST_MPEGTS_STREAM_TYPE_PRIVATE_PES_PACKETS &&
3367           bs->registration_id == DRF_ID_OPUS) {
3368         buffer_list = parse_opus_access_unit (stream);
3369         if (!buffer_list) {
3370           res = GST_FLOW_ERROR;
3371           goto beach;
3372         }
3373
3374         if (gst_buffer_list_length (buffer_list) == 1) {
3375           buffer = gst_buffer_ref (gst_buffer_list_get (buffer_list, 0));
3376           gst_buffer_list_unref (buffer_list);
3377           buffer_list = NULL;
3378         }
3379       } else if (bs->stream_type == GST_MPEGTS_STREAM_TYPE_VIDEO_JP2K) {
3380         buffer = parse_jp2k_access_unit (stream);
3381         if (!buffer) {
3382           res = GST_FLOW_ERROR;
3383           goto beach;
3384         }
3385       } else {
3386         buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
3387       }
3388
3389       stream->seeked_pts = stream->pts;
3390       stream->seeked_dts = stream->dts;
3391       stream->needs_keyframe = FALSE;
3392     } else {
3393       GList *tmp;
3394       GST_DEBUG_OBJECT (stream->pad, "Rewinding after keyframe seek failure");
3395       base->seek_offset = demux->last_seek_offset - 200 * base->packetsize;
3396       if (demux->last_seek_offset < 200 * base->packetsize)
3397         base->seek_offset = 0;
3398       demux->last_seek_offset = base->seek_offset;
3399       mpegts_packetizer_flush (base->packetizer, FALSE);
3400
3401       /* Reset all streams accordingly */
3402       for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
3403         TSDemuxStream *cand = tmp->data;
3404
3405         GST_DEBUG_OBJECT (cand->pad, "Clearing stream");
3406         cand->continuity_counter = CONTINUITY_UNSET;
3407         cand->state = PENDING_PACKET_EMPTY;
3408         if (cand->data)
3409           g_free (cand->data);
3410         cand->data = NULL;
3411         cand->allocated_size = 0;
3412         cand->current_size = 0;
3413       }
3414       base->mode = BASE_MODE_SEEKING;
3415
3416       res = GST_FLOW_REWINDING;
3417       goto beach;
3418     }
3419   } else {
3420     if (bs->stream_type == GST_MPEGTS_STREAM_TYPE_PRIVATE_PES_PACKETS &&
3421         bs->registration_id == DRF_ID_OPUS) {
3422       buffer_list = parse_opus_access_unit (stream);
3423       if (!buffer_list) {
3424         res = GST_FLOW_ERROR;
3425         goto beach;
3426       }
3427
3428       if (gst_buffer_list_length (buffer_list) == 1) {
3429         buffer = gst_buffer_ref (gst_buffer_list_get (buffer_list, 0));
3430         gst_buffer_list_unref (buffer_list);
3431         buffer_list = NULL;
3432       }
3433     } else if (bs->stream_type == GST_MPEGTS_STREAM_TYPE_VIDEO_JP2K) {
3434       buffer = parse_jp2k_access_unit (stream);
3435       if (!buffer) {
3436         res = GST_FLOW_ERROR;
3437         goto beach;
3438       }
3439     } else if (bs->stream_type == GST_MPEGTS_STREAM_TYPE_AUDIO_AAC_ADTS) {
3440       buffer = parse_aac_adts_frame (stream);
3441       if (!buffer) {
3442         res = GST_FLOW_ERROR;
3443         goto beach;
3444       }
3445     } else {
3446       buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
3447     }
3448
3449     if (G_UNLIKELY (stream->pending_ts && !check_pending_buffers (demux))) {
3450       if (buffer) {
3451         PendingBuffer *pend;
3452         pend = g_slice_new0 (PendingBuffer);
3453         pend->buffer = buffer;
3454         pend->pts = stream->raw_pts;
3455         pend->dts = stream->raw_dts;
3456         stream->pending = g_list_append (stream->pending, pend);
3457       } else {
3458         guint i, n;
3459
3460         n = gst_buffer_list_length (buffer_list);
3461         for (i = 0; i < n; i++) {
3462           PendingBuffer *pend;
3463           pend = g_slice_new0 (PendingBuffer);
3464           pend->buffer = gst_buffer_ref (gst_buffer_list_get (buffer_list, i));
3465           pend->pts = i == 0 ? stream->raw_pts : -1;
3466           pend->dts = i == 0 ? stream->raw_dts : -1;
3467           stream->pending = g_list_append (stream->pending, pend);
3468         }
3469         gst_buffer_list_unref (buffer_list);
3470       }
3471       GST_DEBUG_OBJECT (demux,
3472           "Not enough information to push buffers yet, storing buffer");
3473       goto beach;
3474     }
3475   }
3476
3477   if (G_UNLIKELY (stream->need_newsegment))
3478     calculate_and_push_newsegment (demux, stream, target_program);
3479
3480   /* FIXME : Push pending buffers if any */
3481   if (G_UNLIKELY (stream->pending)) {
3482     GList *tmp;
3483     for (tmp = stream->pending; tmp; tmp = tmp->next) {
3484       PendingBuffer *pend = (PendingBuffer *) tmp->data;
3485
3486       GST_DEBUG_OBJECT (stream->pad,
3487           "Pushing pending buffer PTS:%" GST_TIME_FORMAT " DTS:%"
3488           GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (pend->buffer)),
3489           GST_TIME_ARGS (GST_BUFFER_DTS (pend->buffer)));
3490
3491       if (stream->discont)
3492         GST_BUFFER_FLAG_SET (pend->buffer, GST_BUFFER_FLAG_DISCONT);
3493       stream->discont = FALSE;
3494
3495       res = gst_pad_push (stream->pad, pend->buffer);
3496       stream->nb_out_buffers += 1;
3497       g_slice_free (PendingBuffer, pend);
3498     }
3499     g_list_free (stream->pending);
3500     stream->pending = NULL;
3501   }
3502
3503   if ((GST_CLOCK_TIME_IS_VALID (stream->seeked_pts)
3504           && stream->pts < stream->seeked_pts) ||
3505       (GST_CLOCK_TIME_IS_VALID (stream->seeked_dts) &&
3506           stream->pts < stream->seeked_dts)) {
3507     GST_INFO_OBJECT (stream->pad,
3508         "Droping with PTS: %" GST_TIME_FORMAT " DTS: %" GST_TIME_FORMAT
3509         " after seeking as other stream needed to be seeked further"
3510         "(seeked PTS: %" GST_TIME_FORMAT " DTS: %" GST_TIME_FORMAT ")",
3511         GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (stream->dts),
3512         GST_TIME_ARGS (stream->seeked_pts), GST_TIME_ARGS (stream->seeked_dts));
3513     if (buffer)
3514       gst_buffer_unref (buffer);
3515     if (buffer_list)
3516       gst_buffer_list_unref (buffer_list);
3517     goto beach;
3518   }
3519
3520   GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
3521       GST_TIME_ARGS (stream->pts));
3522
3523   /* Decorate buffer or first buffer of the buffer list */
3524   if (buffer_list)
3525     buffer = gst_buffer_list_get (buffer_list, 0);
3526
3527   if (GST_CLOCK_TIME_IS_VALID (stream->pts))
3528     GST_BUFFER_PTS (buffer) = GST_BUFFER_DTS (buffer) = stream->pts;
3529   /* DTS = PTS by default, we override it if there's a real DTS */
3530   if (GST_CLOCK_TIME_IS_VALID (stream->dts))
3531     GST_BUFFER_DTS (buffer) = stream->dts;
3532
3533   if (stream->discont)
3534     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
3535   stream->discont = FALSE;
3536
3537   if (buffer_list)
3538     buffer = NULL;
3539
3540   GST_DEBUG_OBJECT (stream->pad,
3541       "Pushing buffer%s with PTS: %" GST_TIME_FORMAT " , DTS: %"
3542       GST_TIME_FORMAT, (buffer_list ? "list" : ""), GST_TIME_ARGS (stream->pts),
3543       GST_TIME_ARGS (stream->dts));
3544
3545   if (GST_CLOCK_TIME_IS_VALID (stream->dts)) {
3546     if (stream->dts > base->out_segment.position)
3547       base->out_segment.position = stream->dts;
3548   } else if (GST_CLOCK_TIME_IS_VALID (stream->pts)) {
3549     if (stream->pts > base->out_segment.position)
3550       base->out_segment.position = stream->pts;
3551   }
3552
3553   if (buffer) {
3554     res = gst_pad_push (stream->pad, buffer);
3555     /* Record that a buffer was pushed */
3556     stream->nb_out_buffers += 1;
3557   } else {
3558     guint n = gst_buffer_list_length (buffer_list);
3559     res = gst_pad_push_list (stream->pad, buffer_list);
3560     /* Record that a buffer was pushed */
3561     stream->nb_out_buffers += n;
3562   }
3563   GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
3564   res = gst_flow_combiner_update_flow (demux->flowcombiner, res);
3565   GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
3566
3567   /* GAP / sparse stream tracking */
3568   if (G_UNLIKELY (stream->gap_ref_pts == GST_CLOCK_TIME_NONE))
3569     stream->gap_ref_pts = stream->pts;
3570   else {
3571     /* Look if the stream PTS has advanced 2 seconds since the last
3572      * gap check, and sync streams if it has. The first stream to
3573      * hit this will trigger a gap check */
3574     if (G_UNLIKELY (stream->pts != GST_CLOCK_TIME_NONE &&
3575             stream->pts > stream->gap_ref_pts + 2 * GST_SECOND)) {
3576       if (demux->program->pcr_pid != 0x1fff) {
3577         GstClockTime curpcr =
3578             mpegts_packetizer_get_current_time (MPEG_TS_BASE_PACKETIZER (demux),
3579             demux->program->pcr_pid);
3580         if (curpcr == GST_CLOCK_TIME_NONE || curpcr < 800 * GST_MSECOND)
3581           goto beach;
3582         curpcr -= 800 * GST_MSECOND;
3583         /* Use the current PCR (with a safety margin) to sync against */
3584         gst_ts_demux_check_and_sync_streams (demux, curpcr);
3585       } else {
3586         /* If we don't have a PCR track, just use the current stream PTS */
3587         gst_ts_demux_check_and_sync_streams (demux, stream->pts);
3588       }
3589     }
3590   }
3591
3592 beach:
3593   /* Reset the PES payload collection, but don't clear the state,
3594    * we might want to keep collecting this PES */
3595   GST_LOG_OBJECT (demux, "Cleared PES data. returning %s",
3596       gst_flow_get_name (res));
3597   if (stream->expected_size) {
3598     if (stream->current_size > stream->expected_size)
3599       stream->expected_size = 0;
3600     else
3601       stream->expected_size -= stream->current_size;
3602   }
3603   stream->data = NULL;
3604   stream->allocated_size = 0;
3605   stream->current_size = 0;
3606
3607   return res;
3608 }
3609
3610 static GstFlowReturn
3611 gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
3612     MpegTSPacketizerPacket * packet, GstMpegtsSection * section)
3613 {
3614   GstFlowReturn res = GST_FLOW_OK;
3615
3616   GST_LOG_OBJECT (demux, "pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p",
3617       packet->pid, packet->payload_unit_start_indicator,
3618       packet->scram_afc_cc & 0x30,
3619       FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload);
3620
3621   if (G_UNLIKELY (packet->payload_unit_start_indicator) &&
3622       FLAGS_HAS_PAYLOAD (packet->scram_afc_cc)) {
3623     /* Flush previous data */
3624     res = gst_ts_demux_push_pending_data (demux, stream, NULL);
3625     if (res != GST_FLOW_REWINDING) {
3626       /* Tell the data collecting to expect this header. We don't do this when
3627        * rewinding since the states will have been resetted accordingly */
3628       stream->state = PENDING_PACKET_HEADER;
3629     }
3630   }
3631
3632   if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)
3633       && stream->pad) {
3634     gst_ts_demux_queue_data (demux, stream, packet);
3635     GST_LOG_OBJECT (demux, "current_size:%d, expected_size:%d",
3636         stream->current_size, stream->expected_size);
3637     /* Finally check if the data we queued completes a packet, or got too
3638      * large and needs output now */
3639     if ((stream->expected_size && stream->current_size >= stream->expected_size)
3640         || (stream->current_size >= MAX_PES_PAYLOAD)) {
3641       GST_LOG_OBJECT (demux, "pushing packet of size %u", stream->current_size);
3642       res = gst_ts_demux_push_pending_data (demux, stream, NULL);
3643     }
3644   }
3645
3646   /* We are rewinding to find a keyframe,
3647    * and didn't want the data to be queued
3648    */
3649   if (res == GST_FLOW_REWINDING)
3650     res = GST_FLOW_OK;
3651
3652   return res;
3653 }
3654
3655 static void
3656 gst_ts_demux_flush (MpegTSBase * base, gboolean hard)
3657 {
3658   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
3659
3660   gst_ts_demux_flush_streams (demux, hard);
3661
3662   g_mutex_lock (&demux->lock);
3663   gst_event_replace (&demux->segment_event, NULL);
3664   g_mutex_unlock (&demux->lock);
3665   if (demux->global_tags) {
3666     gst_tag_list_unref (demux->global_tags);
3667     demux->global_tags = NULL;
3668   }
3669   if (hard) {
3670     /* For pull mode seeks the current segment needs to be preserved */
3671     demux->rate = 1.0;
3672     gst_segment_init (&base->out_segment, GST_FORMAT_UNDEFINED);
3673   }
3674 }
3675
3676 static GstFlowReturn
3677 gst_ts_demux_drain (MpegTSBase * base)
3678 {
3679   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
3680   GList *tmp;
3681   GstFlowReturn res = GST_FLOW_OK;
3682
3683   if (!demux->program)
3684     return res;
3685
3686   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
3687     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
3688     if (stream->pad) {
3689       res = gst_ts_demux_push_pending_data (demux, stream, NULL);
3690       if (G_UNLIKELY (res != GST_FLOW_OK))
3691         break;
3692     }
3693   }
3694
3695   return res;
3696 }
3697
3698 static GstFlowReturn
3699 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
3700     GstMpegtsSection * section)
3701 {
3702   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
3703   TSDemuxStream *stream = NULL;
3704   GstFlowReturn res = GST_FLOW_OK;
3705
3706   if (G_LIKELY (demux->program)) {
3707     stream = (TSDemuxStream *) demux->program->streams[packet->pid];
3708
3709     if (stream) {
3710       res = gst_ts_demux_handle_packet (demux, stream, packet, section);
3711     }
3712   }
3713   return res;
3714 }