tsdemux: Add support for Motorola DigiCipher II MPEG2 video
[platform/upstream/gstreamer.git] / gst / mpegtsdemux / tsdemux.c
1 /*
2  * tsdemux.c
3  * Copyright (C) 2009 Zaheer Abbas Merali
4  *               2010 Edward Hervey
5  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
6  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
7  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
8  *  Author: Edward Hervey <bilboed@bilboed.com>, Collabora Ltd.
9  *
10  * Authors:
11  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
12  *   Edward Hervey <edward.hervey@collabora.co.uk>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #include <stdlib.h>
35 #include <string.h>
36
37 #include <glib.h>
38 #include <gst/tag/tag.h>
39
40 #include "mpegtsbase.h"
41 #include "tsdemux.h"
42 #include "gstmpegdesc.h"
43 #include "gstmpegdefs.h"
44 #include "mpegtspacketizer.h"
45 #include "pesparse.h"
46
47 /*
48  * tsdemux
49  *
50  * See TODO for explanations on improvements needed
51  */
52
53 /* latency in mseconds */
54 #define TS_LATENCY 700
55
56 #define TABLE_ID_UNSET 0xFF
57
58 #define CONTINUITY_UNSET 255
59 #define MAX_CONTINUITY 15
60
61 #define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
62 /* small PCR for wrap detection */
63 #define PCR_SMALL 17775000
64 /* maximal PCR time */
65 #define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298)
66 #define PTS_DTS_MAX_VALUE (((guint64)1) << 33)
67
68 /* Seeking/Scanning related variables */
69
70 /* seek to SEEK_TIMESTAMP_OFFSET before the desired offset and search then
71  * either accurately or for the next timestamp
72  */
73 #define SEEK_TIMESTAMP_OFFSET (500 * GST_MSECOND)
74
75 #define SEGMENT_FORMAT "[format:%s, rate:%f, start:%"                   \
76   GST_TIME_FORMAT", stop:%"GST_TIME_FORMAT", time:%"GST_TIME_FORMAT     \
77   ", base:%"GST_TIME_FORMAT", position:%"GST_TIME_FORMAT                \
78   ", duration:%"GST_TIME_FORMAT"]"
79
80 #define SEGMENT_ARGS(a) gst_format_get_name((a).format), (a).rate,      \
81     GST_TIME_ARGS((a).start), GST_TIME_ARGS((a).stop),                  \
82     GST_TIME_ARGS((a).time), GST_TIME_ARGS((a).base),                   \
83     GST_TIME_ARGS((a).position), GST_TIME_ARGS((a).duration)
84
85
86 GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
87 #define GST_CAT_DEFAULT ts_demux_debug
88
89 #define ABSDIFF(a,b) (((a) > (b)) ? ((a) - (b)) : ((b) - (a)))
90
91 static GQuark QUARK_TSDEMUX;
92 static GQuark QUARK_PID;
93 static GQuark QUARK_PCR;
94 static GQuark QUARK_OPCR;
95 static GQuark QUARK_PTS;
96 static GQuark QUARK_DTS;
97 static GQuark QUARK_OFFSET;
98
99 typedef enum
100 {
101   PENDING_PACKET_EMPTY = 0,     /* No pending packet/buffer
102                                  * Push incoming buffers to the array */
103   PENDING_PACKET_HEADER,        /* PES header needs to be parsed
104                                  * Push incoming buffers to the array */
105   PENDING_PACKET_BUFFER,        /* Currently filling up output buffer
106                                  * Push incoming buffers to the bufferlist */
107   PENDING_PACKET_DISCONT        /* Discontinuity in incoming packets
108                                  * Drop all incoming buffers */
109 } PendingPacketState;
110
111 typedef struct _TSDemuxStream TSDemuxStream;
112
113 struct _TSDemuxStream
114 {
115   MpegTSBaseStream stream;
116
117   GstPad *pad;
118   /* Whether the pad was added or not */
119   gboolean active;
120
121   /* the return of the latest push */
122   GstFlowReturn flow_return;
123
124   /* Output data */
125   PendingPacketState state;
126
127   /* Data to push (allocated) */
128   guint8 *data;
129
130   /* Size of data to push (if known) */
131   guint expected_size;
132
133   /* Size of currently queued data */
134   guint current_size;
135   guint allocated_size;
136
137   /* Current PTS/DTS for this stream */
138   GstClockTime pts;
139   GstClockTime dts;
140   /* Raw value of current PTS/DTS */
141   guint64 raw_pts;
142   guint64 raw_dts;
143   /* PTS/DTS with rollover fixed */
144   guint64 fixed_pts;
145   guint64 fixed_dts;
146   /* Number of rollover seen for PTS/DTS (default:0) */
147   guint nb_pts_rollover;
148   guint nb_dts_rollover;
149
150   /* Whether this stream needs to send a newsegment */
151   gboolean need_newsegment;
152
153   GstTagList *taglist;
154
155   gint continuity_counter;
156 };
157
158 #define VIDEO_CAPS \
159   GST_STATIC_CAPS (\
160     "video/mpeg, " \
161       "mpegversion = (int) { 1, 2, 4 }, " \
162       "systemstream = (boolean) FALSE; " \
163     "video/x-h264,stream-format=(string)byte-stream," \
164       "alignment=(string)nal;" \
165     "video/x-dirac;" \
166     "video/x-wmv," \
167       "wmvversion = (int) 3, " \
168       "format = (string) WVC1" \
169   )
170
171 #define AUDIO_CAPS \
172   GST_STATIC_CAPS ( \
173     "audio/mpeg, " \
174       "mpegversion = (int) 1;" \
175     "audio/mpeg, " \
176       "mpegversion = (int) 2, " \
177       "stream-format = (string) adts; " \
178     "audio/mpeg, " \
179       "mpegversion = (int) 4, " \
180       "stream-format = (string) loas; " \
181     "audio/x-lpcm, " \
182       "width = (int) { 16, 20, 24 }, " \
183       "rate = (int) { 48000, 96000 }, " \
184       "channels = (int) [ 1, 8 ], " \
185       "dynamic_range = (int) [ 0, 255 ], " \
186       "emphasis = (boolean) { FALSE, TRUE }, " \
187       "mute = (boolean) { FALSE, TRUE }; " \
188     "audio/x-ac3; audio/x-eac3;" \
189     "audio/x-dts;" \
190     "audio/x-private-ts-lpcm" \
191   )
192
193 /* Can also use the subpicture pads for text subtitles? */
194 #define SUBPICTURE_CAPS \
195     GST_STATIC_CAPS ("subpicture/x-pgs; subpicture/x-dvd")
196
197 static GstStaticPadTemplate video_template =
198 GST_STATIC_PAD_TEMPLATE ("video_%04x", GST_PAD_SRC,
199     GST_PAD_SOMETIMES,
200     VIDEO_CAPS);
201
202 static GstStaticPadTemplate audio_template =
203 GST_STATIC_PAD_TEMPLATE ("audio_%04x",
204     GST_PAD_SRC,
205     GST_PAD_SOMETIMES,
206     AUDIO_CAPS);
207
208 static GstStaticPadTemplate subpicture_template =
209 GST_STATIC_PAD_TEMPLATE ("subpicture_%04x",
210     GST_PAD_SRC,
211     GST_PAD_SOMETIMES,
212     SUBPICTURE_CAPS);
213
214 static GstStaticPadTemplate private_template =
215 GST_STATIC_PAD_TEMPLATE ("private_%04x",
216     GST_PAD_SRC,
217     GST_PAD_SOMETIMES,
218     GST_STATIC_CAPS_ANY);
219
220 enum
221 {
222   ARG_0,
223   PROP_PROGRAM_NUMBER,
224   PROP_EMIT_STATS,
225   /* FILL ME */
226 };
227
228 /* Pad functions */
229
230
231 /* mpegtsbase methods */
232 static void
233 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
234 static void
235 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
236 static void gst_ts_demux_reset (MpegTSBase * base);
237 static GstFlowReturn
238 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
239     MpegTSPacketizerSection * section);
240 static void gst_ts_demux_flush (MpegTSBase * base);
241 static void
242 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream,
243     MpegTSBaseProgram * program);
244 static void
245 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream);
246 static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event);
247 static void gst_ts_demux_set_property (GObject * object, guint prop_id,
248     const GValue * value, GParamSpec * pspec);
249 static void gst_ts_demux_get_property (GObject * object, guint prop_id,
250     GValue * value, GParamSpec * pspec);
251 static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux);
252 static GstFlowReturn
253 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream);
254 static void gst_ts_demux_stream_flush (TSDemuxStream * stream);
255
256 static gboolean push_event (MpegTSBase * base, GstEvent * event);
257
258 static void
259 _extra_init (void)
260 {
261   QUARK_TSDEMUX = g_quark_from_string ("tsdemux");
262   QUARK_PID = g_quark_from_string ("pid");
263   QUARK_PCR = g_quark_from_string ("pcr");
264   QUARK_OPCR = g_quark_from_string ("opcr");
265   QUARK_PTS = g_quark_from_string ("pts");
266   QUARK_DTS = g_quark_from_string ("dts");
267   QUARK_OFFSET = g_quark_from_string ("offset");
268 }
269
270 #define gst_ts_demux_parent_class parent_class
271 G_DEFINE_TYPE_WITH_CODE (GstTSDemux, gst_ts_demux, GST_TYPE_MPEGTS_BASE,
272     _extra_init ());
273
274 static void
275 gst_ts_demux_class_init (GstTSDemuxClass * klass)
276 {
277   GObjectClass *gobject_class;
278   GstElementClass *element_class;
279   MpegTSBaseClass *ts_class;
280
281   gobject_class = G_OBJECT_CLASS (klass);
282   gobject_class->set_property = gst_ts_demux_set_property;
283   gobject_class->get_property = gst_ts_demux_get_property;
284
285   g_object_class_install_property (gobject_class, PROP_PROGRAM_NUMBER,
286       g_param_spec_int ("program-number", "Program number",
287           "Program Number to demux for (-1 to ignore)", -1, G_MAXINT,
288           -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289
290   g_object_class_install_property (gobject_class, PROP_EMIT_STATS,
291       g_param_spec_boolean ("emit-stats", "Emit statistics",
292           "Emit messages for every pcr/opcr/pts/dts", FALSE,
293           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
294
295   element_class = GST_ELEMENT_CLASS (klass);
296   gst_element_class_add_pad_template (element_class,
297       gst_static_pad_template_get (&video_template));
298   gst_element_class_add_pad_template (element_class,
299       gst_static_pad_template_get (&audio_template));
300   gst_element_class_add_pad_template (element_class,
301       gst_static_pad_template_get (&subpicture_template));
302   gst_element_class_add_pad_template (element_class,
303       gst_static_pad_template_get (&private_template));
304
305   gst_element_class_set_static_metadata (element_class,
306       "MPEG transport stream demuxer",
307       "Codec/Demuxer",
308       "Demuxes MPEG2 transport streams",
309       "Zaheer Abbas Merali <zaheerabbas at merali dot org>\n"
310       "Edward Hervey <edward.hervey@collabora.co.uk>");
311
312   ts_class = GST_MPEGTS_BASE_CLASS (klass);
313   ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset);
314   ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push);
315   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
316   ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started);
317   ts_class->program_stopped = GST_DEBUG_FUNCPTR (gst_ts_demux_program_stopped);
318   ts_class->stream_added = gst_ts_demux_stream_added;
319   ts_class->stream_removed = gst_ts_demux_stream_removed;
320   ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek);
321   ts_class->flush = GST_DEBUG_FUNCPTR (gst_ts_demux_flush);
322 }
323
324 static void
325 gst_ts_demux_reset (MpegTSBase * base)
326 {
327   GstTSDemux *demux = (GstTSDemux *) base;
328
329   demux->program_number = -1;
330   demux->calculate_update_segment = FALSE;
331
332   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
333   if (demux->segment_event) {
334     gst_event_unref (demux->segment_event);
335     demux->segment_event = NULL;
336   }
337
338   if (demux->update_segment) {
339     gst_event_unref (demux->update_segment);
340     demux->update_segment = NULL;
341   }
342 }
343
344 static void
345 gst_ts_demux_init (GstTSDemux * demux)
346 {
347   GST_MPEGTS_BASE (demux)->stream_size = sizeof (TSDemuxStream);
348
349   gst_ts_demux_reset ((MpegTSBase *) demux);
350 }
351
352
353 static void
354 gst_ts_demux_set_property (GObject * object, guint prop_id,
355     const GValue * value, GParamSpec * pspec)
356 {
357   GstTSDemux *demux = GST_TS_DEMUX (object);
358
359   switch (prop_id) {
360     case PROP_PROGRAM_NUMBER:
361       /* FIXME: do something if program is switched as opposed to set at
362        * beginning */
363       demux->program_number = g_value_get_int (value);
364       break;
365     case PROP_EMIT_STATS:
366       demux->emit_statistics = g_value_get_boolean (value);
367       break;
368     default:
369       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
370   }
371 }
372
373 static void
374 gst_ts_demux_get_property (GObject * object, guint prop_id,
375     GValue * value, GParamSpec * pspec)
376 {
377   GstTSDemux *demux = GST_TS_DEMUX (object);
378
379   switch (prop_id) {
380     case PROP_PROGRAM_NUMBER:
381       g_value_set_int (value, demux->program_number);
382       break;
383     case PROP_EMIT_STATS:
384       g_value_set_boolean (value, demux->emit_statistics);
385       break;
386     default:
387       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
388   }
389 }
390
391 static gboolean
392 gst_ts_demux_srcpad_query (GstPad * pad, GstObject * parent, GstQuery * query)
393 {
394   gboolean res = TRUE;
395   GstFormat format;
396   GstTSDemux *demux;
397   MpegTSBase *base;
398
399   demux = GST_TS_DEMUX (parent);
400   base = GST_MPEGTS_BASE (demux);
401
402   switch (GST_QUERY_TYPE (query)) {
403     case GST_QUERY_DURATION:
404     {
405       GST_DEBUG ("query duration");
406       gst_query_parse_duration (query, &format, NULL);
407       if (format == GST_FORMAT_TIME) {
408         if (!gst_pad_peer_query (base->sinkpad, query)) {
409           gint64 val;
410
411           format = GST_FORMAT_BYTES;
412           if (!gst_pad_peer_query_duration (base->sinkpad, format, &val))
413             res = FALSE;
414           else {
415             GstClockTime dur =
416                 mpegts_packetizer_offset_to_ts (base->packetizer, val,
417                 demux->program->pcr_pid);
418             if (GST_CLOCK_TIME_IS_VALID (dur))
419               gst_query_set_duration (query, GST_FORMAT_TIME, dur);
420             else
421               res = FALSE;
422           }
423         }
424       } else {
425         GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
426         res = FALSE;
427       }
428       break;
429     }
430     case GST_QUERY_LATENCY:
431     {
432       GST_DEBUG ("query latency");
433       res = gst_pad_peer_query (base->sinkpad, query);
434       if (res && base->upstream_live) {
435         GstClockTime min_lat, max_lat;
436         gboolean live;
437
438         /* According to H.222.0
439            Annex D.0.3 (System Time Clock recovery in the decoder)
440            and D.0.2 (Audio and video presentation synchronization)
441
442            We can end up with an interval of up to 700ms between valid
443            PCR/SCR. We therefore allow a latency of 700ms for that.
444          */
445         gst_query_parse_latency (query, &live, &min_lat, &max_lat);
446         if (min_lat != -1)
447           min_lat += 700 * GST_MSECOND;
448         if (max_lat != -1)
449           max_lat += 700 * GST_MSECOND;
450         gst_query_set_latency (query, live, min_lat, max_lat);
451       }
452       break;
453     }
454     case GST_QUERY_SEEKING:
455     {
456       GST_DEBUG ("query seeking");
457       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
458       if (format == GST_FORMAT_TIME) {
459         gboolean seekable = FALSE;
460
461         if (gst_pad_peer_query (base->sinkpad, query))
462           gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
463
464         /* If upstream is not seekable in TIME format we use
465          * our own values here */
466         if (!seekable)
467           gst_query_set_seeking (query, GST_FORMAT_TIME,
468               demux->parent.mode != BASE_MODE_PUSHING, 0,
469               demux->segment.duration);
470       } else {
471         GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking");
472         res = FALSE;
473       }
474       break;
475     }
476     default:
477       res = gst_pad_query_default (pad, parent, query);
478   }
479
480   return res;
481
482 }
483
484 static GstFlowReturn
485 gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
486 {
487   GstTSDemux *demux = (GstTSDemux *) base;
488   GstFlowReturn res = GST_FLOW_ERROR;
489   gdouble rate;
490   GstFormat format;
491   GstSeekFlags flags;
492   GstSeekType start_type, stop_type;
493   gint64 start, stop;
494   GstSegment seeksegment;
495   gboolean update;
496   guint64 start_offset;
497
498   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
499       &stop_type, &stop);
500
501   if (format != GST_FORMAT_TIME) {
502     goto done;
503   }
504
505   GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
506       " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
507       GST_TIME_ARGS (stop));
508
509   if (flags & (GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_SKIP)) {
510     GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
511     goto done;
512   }
513
514   /* copy segment, we need this because we still need the old
515    * segment when we close the current segment. */
516   memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
517   if (demux->segment_event) {
518     gst_event_unref (demux->segment_event);
519     demux->segment_event = NULL;
520   }
521   /* configure the segment with the seek variables */
522   GST_DEBUG_OBJECT (demux, "configuring seek");
523   GST_DEBUG ("seeksegment before set_seek " SEGMENT_FORMAT,
524       SEGMENT_ARGS (seeksegment));
525
526   gst_segment_do_seek (&seeksegment, rate, format, flags, start_type, start,
527       stop_type, stop, &update);
528
529   GST_DEBUG ("seeksegment after set_seek " SEGMENT_FORMAT,
530       SEGMENT_ARGS (seeksegment));
531
532   /* Convert start/stop to offset */
533   start_offset =
534       mpegts_packetizer_ts_to_offset (base->packetizer, MAX (0,
535           start - SEEK_TIMESTAMP_OFFSET), demux->program->pcr_pid);
536
537   if (G_UNLIKELY (start_offset == -1)) {
538     GST_WARNING ("Couldn't convert start position to an offset");
539     goto done;
540   }
541
542   /* record offset */
543   base->seek_offset = start_offset;
544   res = GST_FLOW_OK;
545
546   /* commit the new segment */
547   memcpy (&demux->segment, &seeksegment, sizeof (GstSegment));
548
549   if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
550     gst_element_post_message (GST_ELEMENT_CAST (demux),
551         gst_message_new_segment_start (GST_OBJECT_CAST (demux),
552             demux->segment.format, demux->segment.stop));
553   }
554
555 done:
556   return res;
557 }
558
559 static gboolean
560 gst_ts_demux_srcpad_event (GstPad * pad, GstObject * parent, GstEvent * event)
561 {
562   gboolean res = TRUE;
563   GstTSDemux *demux = GST_TS_DEMUX (parent);
564
565   GST_DEBUG_OBJECT (pad, "Got event %s",
566       gst_event_type_get_name (GST_EVENT_TYPE (event)));
567
568   switch (GST_EVENT_TYPE (event)) {
569     case GST_EVENT_SEEK:
570       res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event);
571       if (!res)
572         GST_WARNING ("seeking failed");
573       gst_event_unref (event);
574       break;
575     default:
576       res = gst_pad_event_default (pad, parent, event);
577   }
578
579   return res;
580 }
581
582 static gboolean
583 push_event (MpegTSBase * base, GstEvent * event)
584 {
585   GstTSDemux *demux = (GstTSDemux *) base;
586   GList *tmp;
587
588   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
589     GST_DEBUG_OBJECT (base, "Ignoring segment event (recreated later)");
590     gst_event_unref (event);
591     return TRUE;
592   }
593
594   if (G_UNLIKELY (demux->program == NULL)) {
595     gst_event_unref (event);
596     return FALSE;
597   }
598
599   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
600     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
601     if (stream->pad) {
602       gst_event_ref (event);
603       gst_pad_push_event (stream->pad, event);
604     }
605   }
606
607   gst_event_unref (event);
608
609   return TRUE;
610 }
611
612 static GstFlowReturn
613 tsdemux_combine_flows (GstTSDemux * demux, TSDemuxStream * stream,
614     GstFlowReturn ret)
615 {
616   GList *tmp;
617
618   /* Store the value */
619   stream->flow_return = ret;
620
621   /* any other error that is not-linked can be returned right away */
622   if (ret != GST_FLOW_NOT_LINKED)
623     goto done;
624
625   /* Only return NOT_LINKED if all other pads returned NOT_LINKED */
626   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
627     stream = (TSDemuxStream *) tmp->data;
628     if (stream->pad) {
629       ret = stream->flow_return;
630       /* some other return value (must be SUCCESS but we can return
631        * other values as well) */
632       if (ret != GST_FLOW_NOT_LINKED)
633         goto done;
634     }
635     /* if we get here, all other pads were unlinked and we return
636      * NOT_LINKED then */
637   }
638
639 done:
640   return ret;
641 }
642
643 static void
644 gst_ts_demux_create_tags (TSDemuxStream * stream)
645 {
646   guint8 *desc = NULL;
647   int i;
648
649   desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
650       DESC_ISO_639_LANGUAGE);
651
652   if (!desc) {
653     desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
654         DESC_DVB_SUBTITLING);
655   }
656
657   if (desc) {
658     if (!stream->taglist)
659       stream->taglist = gst_tag_list_new_empty ();
660
661     for (i = 0; i < DESC_ISO_639_LANGUAGE_codes_n (desc); i++) {
662       const gchar *lc;
663       gchar lang_code[4];
664       gchar *language_n;
665
666       language_n = (gchar *)
667           DESC_ISO_639_LANGUAGE_language_code_nth (desc, i);
668
669       /* Language codes should be 3 character long, we allow
670        * a bit more flexibility by allowing 2 characters. */
671       if (!language_n[0] || !language_n[1])
672         continue;
673
674       GST_LOG ("Add language code for stream: %s", language_n);
675
676       lang_code[0] = language_n[0];
677       lang_code[1] = language_n[1];
678       lang_code[2] = language_n[2];
679       lang_code[3] = 0;
680
681       /* descriptor contains ISO 639-2 code, we want the ISO 639-1 code */
682       lc = gst_tag_get_language_code (lang_code);
683       gst_tag_list_add (stream->taglist, GST_TAG_MERGE_REPLACE,
684           GST_TAG_LANGUAGE_CODE, (lc) ? lc : lang_code, NULL);
685     }
686
687     g_free (desc);
688   }
689 }
690
691 static GstPad *
692 create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
693     MpegTSBaseProgram * program)
694 {
695   TSDemuxStream *stream = (TSDemuxStream *) bstream;
696   gchar *name = NULL;
697   GstCaps *caps = NULL;
698   GstPadTemplate *template = NULL;
699   guint8 *desc = NULL;
700   GstPad *pad = NULL;
701
702   gst_ts_demux_create_tags (stream);
703
704   GST_LOG ("Attempting to create pad for stream 0x%04x with stream_type %d",
705       bstream->pid, bstream->stream_type);
706
707   /* First handle BluRay-specific stream types since there is some overlap
708    * between BluRay and non-BluRay streay type identifiers */
709   desc = mpegts_get_descriptor_from_program (program, DESC_REGISTRATION);
710   if (desc) {
711     if (DESC_REGISTRATION_format_identifier (desc) == DRF_ID_HDMV) {
712       switch (bstream->stream_type) {
713         case ST_BD_AUDIO_AC3:
714         {
715           guint8 *ac3_desc;
716
717           /* ATSC ac3 audio descriptor */
718           ac3_desc =
719               mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
720               DESC_AC3_AUDIO_STREAM);
721           if (ac3_desc && DESC_AC_AUDIO_STREAM_bsid (ac3_desc) != 16) {
722             GST_LOG ("ac3 audio");
723             template = gst_static_pad_template_get (&audio_template);
724             name = g_strdup_printf ("audio_%04x", bstream->pid);
725             caps = gst_caps_new_empty_simple ("audio/x-ac3");
726
727             g_free (ac3_desc);
728           } else {
729             template = gst_static_pad_template_get (&audio_template);
730             name = g_strdup_printf ("audio_%04x", bstream->pid);
731             caps = gst_caps_new_empty_simple ("audio/x-eac3");
732           }
733           break;
734         }
735         case ST_BD_AUDIO_EAC3:
736           template = gst_static_pad_template_get (&audio_template);
737           name = g_strdup_printf ("audio_%04x", bstream->pid);
738           caps = gst_caps_new_empty_simple ("audio/x-eac3");
739           break;
740         case ST_BD_AUDIO_AC3_TRUE_HD:
741           template = gst_static_pad_template_get (&audio_template);
742           name = g_strdup_printf ("audio_%04x", bstream->pid);
743           caps = gst_caps_new_empty_simple ("audio/x-true-hd");
744           break;
745         case ST_BD_AUDIO_LPCM:
746           template = gst_static_pad_template_get (&audio_template);
747           name = g_strdup_printf ("audio_%04x", bstream->pid);
748           caps = gst_caps_new_empty_simple ("audio/x-private-ts-lpcm");
749           break;
750         case ST_BD_PGS_SUBPICTURE:
751           template = gst_static_pad_template_get (&subpicture_template);
752           name = g_strdup_printf ("subpicture_%04x", bstream->pid);
753           caps = gst_caps_new_empty_simple ("subpicture/x-pgs");
754           break;
755       }
756     }
757     g_free (desc);
758   }
759   if (template && name && caps)
760     goto done;
761
762   /* Handle non-BluRay stream types */
763   switch (bstream->stream_type) {
764     case ST_VIDEO_MPEG1:
765     case ST_VIDEO_MPEG2:
766     case ST_PS_VIDEO_MPEG2_DCII:
767       GST_LOG ("mpeg video");
768       template = gst_static_pad_template_get (&video_template);
769       name = g_strdup_printf ("video_%04x", bstream->pid);
770       caps = gst_caps_new_simple ("video/mpeg",
771           "mpegversion", G_TYPE_INT,
772           bstream->stream_type == ST_VIDEO_MPEG1 ? 1 : 2, "systemstream",
773           G_TYPE_BOOLEAN, FALSE, NULL);
774
775       break;
776     case ST_AUDIO_MPEG1:
777     case ST_AUDIO_MPEG2:
778       GST_LOG ("mpeg audio");
779       template = gst_static_pad_template_get (&audio_template);
780       name = g_strdup_printf ("audio_%04x", bstream->pid);
781       caps =
782           gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 1,
783           NULL);
784       break;
785     case ST_PRIVATE_DATA:
786       GST_LOG ("private data");
787       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
788           DESC_DVB_AC3);
789       if (desc) {
790         GST_LOG ("ac3 audio");
791         template = gst_static_pad_template_get (&audio_template);
792         name = g_strdup_printf ("audio_%04x", bstream->pid);
793         caps = gst_caps_new_empty_simple ("audio/x-ac3");
794         g_free (desc);
795         break;
796       }
797
798       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
799           DESC_DVB_ENHANCED_AC3);
800       if (desc) {
801         GST_LOG ("ac3 audio");
802         template = gst_static_pad_template_get (&audio_template);
803         name = g_strdup_printf ("audio_%04x", bstream->pid);
804         caps = gst_caps_new_empty_simple ("audio/x-eac3");
805         g_free (desc);
806         break;
807       }
808       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
809           DESC_DVB_TELETEXT);
810       if (desc) {
811         GST_LOG ("teletext");
812         template = gst_static_pad_template_get (&private_template);
813         name = g_strdup_printf ("private_%04x", bstream->pid);
814         caps = gst_caps_new_empty_simple ("private/teletext");
815         g_free (desc);
816         break;
817       }
818       desc =
819           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
820           DESC_DVB_SUBTITLING);
821       if (desc) {
822         GST_LOG ("subtitling");
823         template = gst_static_pad_template_get (&private_template);
824         name = g_strdup_printf ("private_%04x", bstream->pid);
825         caps = gst_caps_new_empty_simple ("subpicture/x-dvb");
826         g_free (desc);
827         break;
828       }
829
830       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
831           DESC_REGISTRATION);
832       if (desc) {
833         switch (DESC_REGISTRATION_format_identifier (desc)) {
834           case DRF_ID_DTS1:
835           case DRF_ID_DTS2:
836           case DRF_ID_DTS3:
837             /* SMPTE registered DTS */
838             GST_LOG ("subtitling");
839             template = gst_static_pad_template_get (&private_template);
840             name = g_strdup_printf ("private_%04x", bstream->pid);
841             caps = gst_caps_new_empty_simple ("audio/x-dts");
842             break;
843           case DRF_ID_S302M:
844             template = gst_static_pad_template_get (&audio_template);
845             name = g_strdup_printf ("audio_%04x", bstream->pid);
846             caps = gst_caps_new_empty_simple ("audio/x-smpte-302m");
847             break;
848         }
849         g_free (desc);
850       }
851       if (template)
852         break;
853
854       /* hack for itv hd (sid 10510, video pid 3401 */
855       if (program->program_number == 10510 && bstream->pid == 3401) {
856         template = gst_static_pad_template_get (&video_template);
857         name = g_strdup_printf ("video_%04x", bstream->pid);
858         caps = gst_caps_new_simple ("video/x-h264",
859             "stream-format", G_TYPE_STRING, "byte-stream",
860             "alignment", G_TYPE_STRING, "nal", NULL);
861       }
862       break;
863     case ST_HDV_AUX_V:
864       /* We don't expose those streams since they're only helper streams */
865       /* template = gst_static_pad_template_get (&private_template); */
866       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
867       /* caps = gst_caps_new_simple ("hdv/aux-v", NULL); */
868       break;
869     case ST_HDV_AUX_A:
870       /* We don't expose those streams since they're only helper streams */
871       /* template = gst_static_pad_template_get (&private_template); */
872       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
873       /* caps = gst_caps_new_simple ("hdv/aux-a", NULL); */
874       break;
875     case ST_PRIVATE_SECTIONS:
876     case ST_MHEG:
877     case ST_DSMCC:
878     case ST_DSMCC_A:
879     case ST_DSMCC_B:
880     case ST_DSMCC_C:
881     case ST_DSMCC_D:
882       MPEGTS_BIT_UNSET (base->is_pes, bstream->pid);
883       break;
884     case ST_AUDIO_AAC_ADTS:
885       template = gst_static_pad_template_get (&audio_template);
886       name = g_strdup_printf ("audio_%04x", bstream->pid);
887       caps = gst_caps_new_simple ("audio/mpeg",
888           "mpegversion", G_TYPE_INT, 2,
889           "stream-format", G_TYPE_STRING, "adts", NULL);
890       break;
891     case ST_AUDIO_AAC_LATM:
892       template = gst_static_pad_template_get (&audio_template);
893       name = g_strdup_printf ("audio_%04x", bstream->pid);
894       caps = gst_caps_new_simple ("audio/mpeg",
895           "mpegversion", G_TYPE_INT, 4,
896           "stream-format", G_TYPE_STRING, "loas", NULL);
897       break;
898     case ST_VIDEO_MPEG4:
899       template = gst_static_pad_template_get (&video_template);
900       name = g_strdup_printf ("video_%04x", bstream->pid);
901       caps = gst_caps_new_simple ("video/mpeg",
902           "mpegversion", G_TYPE_INT, 4,
903           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
904       break;
905     case ST_VIDEO_H264:
906       template = gst_static_pad_template_get (&video_template);
907       name = g_strdup_printf ("video_%04x", bstream->pid);
908       caps = gst_caps_new_simple ("video/x-h264",
909           "stream-format", G_TYPE_STRING, "byte-stream",
910           "alignment", G_TYPE_STRING, "nal", NULL);
911       break;
912     case ST_VIDEO_DIRAC:
913       desc =
914           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
915           DESC_REGISTRATION);
916       if (desc) {
917         if (DESC_LENGTH (desc) >= 4) {
918           if (DESC_REGISTRATION_format_identifier (desc) == 0x64726163) {
919             GST_LOG ("dirac");
920             /* dirac in hex */
921             template = gst_static_pad_template_get (&video_template);
922             name = g_strdup_printf ("video_%04x", bstream->pid);
923             caps = gst_caps_new_empty_simple ("video/x-dirac");
924           }
925         }
926         g_free (desc);
927       }
928       break;
929     case ST_PRIVATE_EA:        /* Try to detect a VC1 stream */
930     {
931       gboolean is_vc1 = FALSE;
932       desc =
933           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
934           DESC_REGISTRATION);
935       if (desc) {
936         if (DESC_LENGTH (desc) >= 4) {
937           if (DESC_REGISTRATION_format_identifier (desc) == DRF_ID_VC1) {
938             is_vc1 = TRUE;
939           }
940         }
941         g_free (desc);
942       }
943       if (!is_vc1) {
944         GST_WARNING ("0xea private stream type found but no descriptor "
945             "for VC1. Assuming plain VC1.");
946       }
947
948       template = gst_static_pad_template_get (&video_template);
949       name = g_strdup_printf ("video_%04x", bstream->pid);
950       caps = gst_caps_new_simple ("video/x-wmv",
951           "wmvversion", G_TYPE_INT, 3, "format", G_TYPE_STRING, "WVC1", NULL);
952
953       break;
954     }
955     case ST_PS_AUDIO_AC3:
956       /* DVB_ENHANCED_AC3 */
957       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
958           DESC_DVB_ENHANCED_AC3);
959       if (desc) {
960         template = gst_static_pad_template_get (&audio_template);
961         name = g_strdup_printf ("audio_%04x", bstream->pid);
962         caps = gst_caps_new_empty_simple ("audio/x-eac3");
963         g_free (desc);
964         break;
965       }
966
967       /* DVB_AC3 */
968       desc =
969           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
970           DESC_DVB_AC3);
971       if (!desc)
972         GST_WARNING ("AC3 stream type found but no corresponding "
973             "descriptor to differentiate between AC3 and EAC3. "
974             "Assuming plain AC3.");
975       else
976         g_free (desc);
977       template = gst_static_pad_template_get (&audio_template);
978       name = g_strdup_printf ("audio_%04x", bstream->pid);
979       caps = gst_caps_new_empty_simple ("audio/x-ac3");
980       break;
981     case ST_PS_AUDIO_DTS:
982       template = gst_static_pad_template_get (&audio_template);
983       name = g_strdup_printf ("audio_%04x", bstream->pid);
984       caps = gst_caps_new_empty_simple ("audio/x-dts");
985       break;
986     case ST_PS_AUDIO_LPCM:
987       template = gst_static_pad_template_get (&audio_template);
988       name = g_strdup_printf ("audio_%04x", bstream->pid);
989       caps = gst_caps_new_empty_simple ("audio/x-lpcm");
990       break;
991     case ST_PS_DVD_SUBPICTURE:
992       template = gst_static_pad_template_get (&subpicture_template);
993       name = g_strdup_printf ("subpicture_%04x", bstream->pid);
994       caps = gst_caps_new_empty_simple ("subpicture/x-dvd");
995       break;
996     default:
997       GST_WARNING ("Non-media stream (stream_type:0x%x). Not creating pad",
998           bstream->stream_type);
999       break;
1000   }
1001
1002 done:
1003   if (template && name && caps) {
1004     gchar *stream_id;
1005
1006     GST_LOG ("stream:%p creating pad with name %s and caps %s", stream, name,
1007         gst_caps_to_string (caps));
1008     pad = gst_pad_new_from_template (template, name);
1009     gst_pad_set_active (pad, TRUE);
1010     gst_pad_use_fixed_caps (pad);
1011     stream_id =
1012         gst_pad_create_stream_id_printf (pad, GST_ELEMENT_CAST (base), "%08x",
1013         bstream->pid);
1014     gst_pad_push_event (pad, gst_event_new_stream_start (stream_id));
1015     g_free (stream_id);
1016     gst_pad_set_caps (pad, caps);
1017     gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query);
1018     gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event);
1019   }
1020
1021   if (name)
1022     g_free (name);
1023   if (template)
1024     gst_object_unref (template);
1025   if (caps)
1026     gst_caps_unref (caps);
1027
1028   return pad;
1029 }
1030
1031 static void
1032 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
1033     MpegTSBaseProgram * program)
1034 {
1035   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1036
1037   if (!stream->pad) {
1038     /* Create the pad */
1039     if (bstream->stream_type != 0xff)
1040       stream->pad = create_pad_for_stream (base, bstream, program);
1041     stream->active = FALSE;
1042
1043     stream->need_newsegment = TRUE;
1044     stream->pts = GST_CLOCK_TIME_NONE;
1045     stream->dts = GST_CLOCK_TIME_NONE;
1046     stream->raw_pts = 0;
1047     stream->raw_dts = 0;
1048     stream->fixed_pts = 0;
1049     stream->fixed_dts = 0;
1050     stream->nb_pts_rollover = 0;
1051     stream->nb_dts_rollover = 0;
1052     stream->continuity_counter = CONTINUITY_UNSET;
1053   }
1054   stream->flow_return = GST_FLOW_OK;
1055 }
1056
1057 static void
1058 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
1059 {
1060   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1061
1062   if (stream->pad) {
1063     if (stream->active && gst_pad_is_active (stream->pad)) {
1064       /* Flush out all data */
1065       GST_DEBUG_OBJECT (stream->pad, "Flushing out pending data");
1066       gst_ts_demux_push_pending_data ((GstTSDemux *) base, stream);
1067
1068       GST_DEBUG_OBJECT (stream->pad, "Pushing out EOS");
1069       gst_pad_push_event (stream->pad, gst_event_new_eos ());
1070       GST_DEBUG_OBJECT (stream->pad, "Deactivating and removing pad");
1071       gst_pad_set_active (stream->pad, FALSE);
1072       gst_element_remove_pad (GST_ELEMENT_CAST (base), stream->pad);
1073       stream->active = FALSE;
1074     }
1075     stream->pad = NULL;
1076   }
1077   gst_ts_demux_stream_flush (stream);
1078   stream->flow_return = GST_FLOW_NOT_LINKED;
1079 }
1080
1081 static void
1082 activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
1083 {
1084   GList *tmp;
1085   gboolean alldone = TRUE;
1086
1087   if (stream->pad) {
1088     GST_DEBUG_OBJECT (tsdemux, "Activating pad %s:%s for stream %p",
1089         GST_DEBUG_PAD_NAME (stream->pad), stream);
1090     gst_element_add_pad ((GstElement *) tsdemux, stream->pad);
1091     stream->active = TRUE;
1092     GST_DEBUG_OBJECT (stream->pad, "done adding pad");
1093
1094     /* Check if all pads were activated, and if so emit no-more-pads */
1095     for (tmp = tsdemux->program->stream_list; tmp; tmp = tmp->next) {
1096       stream = (TSDemuxStream *) tmp->data;
1097       if (stream->pad && !stream->active)
1098         alldone = FALSE;
1099     }
1100     if (alldone) {
1101       GST_DEBUG_OBJECT (tsdemux, "All pads were activated, emit no-more-pads");
1102       gst_element_no_more_pads ((GstElement *) tsdemux);
1103     }
1104   } else
1105     GST_WARNING_OBJECT (tsdemux,
1106         "stream %p (pid 0x%04x, type:0x%03x) has no pad", stream,
1107         ((MpegTSBaseStream *) stream)->pid,
1108         ((MpegTSBaseStream *) stream)->stream_type);
1109 }
1110
1111 static void
1112 gst_ts_demux_stream_flush (TSDemuxStream * stream)
1113 {
1114   stream->pts = GST_CLOCK_TIME_NONE;
1115
1116   GST_DEBUG ("flushing stream %p", stream);
1117
1118   if (stream->data)
1119     g_free (stream->data);
1120   stream->data = NULL;
1121   stream->state = PENDING_PACKET_EMPTY;
1122   stream->expected_size = 0;
1123   stream->allocated_size = 0;
1124   stream->current_size = 0;
1125   stream->need_newsegment = TRUE;
1126   stream->pts = GST_CLOCK_TIME_NONE;
1127   stream->dts = GST_CLOCK_TIME_NONE;
1128   stream->raw_pts = 0;
1129   stream->raw_dts = 0;
1130   stream->fixed_pts = 0;
1131   stream->fixed_dts = 0;
1132   stream->nb_pts_rollover = 0;
1133   stream->nb_dts_rollover = 0;
1134   if (stream->flow_return == GST_FLOW_FLUSHING) {
1135     stream->flow_return = GST_FLOW_OK;
1136   }
1137   stream->continuity_counter = CONTINUITY_UNSET;
1138 }
1139
1140 static void
1141 gst_ts_demux_flush_streams (GstTSDemux * demux)
1142 {
1143   g_list_foreach (demux->program->stream_list,
1144       (GFunc) gst_ts_demux_stream_flush, NULL);
1145 }
1146
1147 static void
1148 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
1149 {
1150   GstTSDemux *demux = GST_TS_DEMUX (base);
1151
1152   GST_DEBUG ("Current program %d, new program %d",
1153       demux->program_number, program->program_number);
1154
1155   if (demux->program_number == -1 ||
1156       demux->program_number == program->program_number) {
1157
1158     GST_LOG ("program %d started", program->program_number);
1159     demux->program_number = program->program_number;
1160     demux->program = program;
1161
1162     /* If this is not the initial program, we need to calculate
1163      * an update newsegment */
1164     demux->calculate_update_segment = !program->initial_program;
1165
1166     /* FIXME : When do we emit no_more_pads ? */
1167   }
1168 }
1169
1170 static void
1171 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
1172 {
1173   GstTSDemux *demux = GST_TS_DEMUX (base);
1174
1175   if (demux->program == program) {
1176     demux->program = NULL;
1177     demux->program_number = -1;
1178   }
1179 }
1180
1181
1182 static inline void
1183 gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
1184     guint64 pts, guint64 offset)
1185 {
1186   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1187
1188   if (pts == -1) {
1189     stream->pts = GST_CLOCK_TIME_NONE;
1190     return;
1191   }
1192
1193   GST_LOG ("pid 0x%04x pts:%" G_GUINT64_FORMAT " at offset %"
1194       G_GUINT64_FORMAT, bs->pid, pts, offset);
1195
1196   if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (stream->pts) &&
1197           ABSDIFF (stream->raw_pts, pts) > 900000)) {
1198     /* Detect rollover if diff > 10s */
1199     GST_LOG ("Detected rollover (previous:%" G_GUINT64_FORMAT " new:%"
1200         G_GUINT64_FORMAT ")", stream->raw_pts, pts);
1201     if (pts < stream->raw_pts) {
1202       /* Forward rollover */
1203       GST_LOG ("Forward rollover, incrementing nb_pts_rollover");
1204       stream->nb_pts_rollover++;
1205     } else {
1206       /* Reverse rollover */
1207       GST_LOG ("Reverse rollover, decrementing nb_pts_rollover");
1208       stream->nb_pts_rollover--;
1209     }
1210   }
1211
1212   /* Compute PTS in GstClockTime */
1213   stream->raw_pts = pts;
1214   stream->fixed_pts = pts + stream->nb_pts_rollover * PTS_DTS_MAX_VALUE;
1215   stream->pts = MPEGTIME_TO_GSTTIME (stream->fixed_pts);
1216
1217   GST_LOG ("pid 0x%04x Stored PTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
1218       bs->pid, stream->raw_pts, GST_TIME_ARGS (stream->pts));
1219
1220
1221   if (G_UNLIKELY (demux->emit_statistics)) {
1222     GstStructure *st;
1223     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
1224     gst_structure_id_set (st,
1225         QUARK_PID, G_TYPE_UINT, bs->pid,
1226         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_PTS, G_TYPE_UINT64, pts,
1227         NULL);
1228     gst_element_post_message (GST_ELEMENT_CAST (demux),
1229         gst_message_new_element (GST_OBJECT (demux), st));
1230   }
1231 }
1232
1233 static inline void
1234 gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
1235     guint64 dts, guint64 offset)
1236 {
1237   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1238
1239   if (dts == -1) {
1240     stream->dts = GST_CLOCK_TIME_NONE;
1241     return;
1242   }
1243
1244   GST_LOG ("pid 0x%04x dts:%" G_GUINT64_FORMAT " at offset %"
1245       G_GUINT64_FORMAT, bs->pid, dts, offset);
1246
1247   if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (stream->dts) &&
1248           ABSDIFF (stream->raw_dts, dts) > 900000)) {
1249     /* Detect rollover if diff > 10s */
1250     GST_LOG ("Detected rollover (previous:%" G_GUINT64_FORMAT " new:%"
1251         G_GUINT64_FORMAT ")", stream->raw_dts, dts);
1252     if (dts < stream->raw_dts) {
1253       /* Forward rollover */
1254       GST_LOG ("Forward rollover, incrementing nb_dts_rollover");
1255       stream->nb_dts_rollover++;
1256     } else {
1257       /* Reverse rollover */
1258       GST_LOG ("Reverse rollover, decrementing nb_dts_rollover");
1259       stream->nb_dts_rollover--;
1260     }
1261   }
1262
1263   /* Compute DTS in GstClockTime */
1264   stream->raw_dts = dts;
1265   stream->fixed_dts = dts + stream->nb_dts_rollover * PTS_DTS_MAX_VALUE;
1266   stream->dts = MPEGTIME_TO_GSTTIME (stream->fixed_dts);
1267
1268   GST_LOG ("pid 0x%04x Stored DTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
1269       bs->pid, stream->raw_dts, GST_TIME_ARGS (stream->dts));
1270
1271   if (G_UNLIKELY (demux->emit_statistics)) {
1272     GstStructure *st;
1273     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
1274     gst_structure_id_set (st,
1275         QUARK_PID, G_TYPE_UINT, bs->pid,
1276         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_DTS, G_TYPE_UINT64, dts,
1277         NULL);
1278     gst_element_post_message (GST_ELEMENT_CAST (demux),
1279         gst_message_new_element (GST_OBJECT (demux), st));
1280   }
1281 }
1282
1283 static void
1284 gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
1285     guint8 * data, guint32 length, guint64 bufferoffset)
1286 {
1287   MpegTSBase *base = (MpegTSBase *) demux;
1288   PESHeader header;
1289   gint offset = 0;
1290   PESParsingResult parseres;
1291
1292   GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
1293
1294   parseres = mpegts_parse_pes_header (data, length, &header, &offset);
1295   if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE))
1296     goto discont;
1297   if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
1298     GST_WARNING ("Error parsing PES header. pid: 0x%x stream_type: 0x%x",
1299         stream->stream.pid, stream->stream.stream_type);
1300     goto discont;
1301   }
1302
1303   gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset);
1304   gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset);
1305
1306   GST_DEBUG_OBJECT (base,
1307       "stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT,
1308       GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (stream->dts));
1309
1310   /* Remove PES headers */
1311   GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
1312       header.header_size, header.packet_length, length);
1313   stream->expected_size = header.packet_length;
1314   if (stream->expected_size) {
1315     if (G_LIKELY (stream->expected_size > header.header_size)) {
1316       stream->expected_size -= header.header_size;
1317     } else {
1318       /* next packet will have to complete this one */
1319       GST_ERROR ("invalid header and packet size combination");
1320       stream->expected_size = 0;
1321     }
1322   }
1323   data += header.header_size;
1324   length -= header.header_size;
1325
1326   /* Create the output buffer */
1327   if (stream->expected_size)
1328     stream->allocated_size = stream->expected_size;
1329   else
1330     stream->allocated_size = 8192;
1331   g_assert (stream->data == NULL);
1332   stream->data = g_malloc (stream->allocated_size);
1333   memcpy (stream->data, data, length);
1334   stream->current_size = length;
1335
1336   stream->state = PENDING_PACKET_BUFFER;
1337
1338   return;
1339
1340 discont:
1341   stream->state = PENDING_PACKET_DISCONT;
1342   return;
1343 }
1344
1345  /* ONLY CALL THIS:
1346   * * WITH packet->payload != NULL
1347   * * WITH pending/current flushed out if beginning of new PES packet
1348   */
1349 static inline void
1350 gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
1351     MpegTSPacketizerPacket * packet)
1352 {
1353   guint8 *data;
1354   guint size;
1355
1356   GST_DEBUG ("pid: 0x%04x state:%d", stream->stream.pid, stream->state);
1357
1358   size = packet->data_end - packet->payload;
1359   data = packet->payload;
1360
1361   if (stream->continuity_counter == CONTINUITY_UNSET) {
1362     GST_DEBUG ("CONTINUITY: Initialize to %d", packet->continuity_counter);
1363   } else if ((packet->continuity_counter == stream->continuity_counter + 1 ||
1364           (stream->continuity_counter == MAX_CONTINUITY &&
1365               packet->continuity_counter == 0))) {
1366     GST_LOG ("CONTINUITY: Got expected %d", packet->continuity_counter);
1367   } else {
1368     GST_ERROR ("CONTINUITY: Mismatch packet %d, stream %d",
1369         packet->continuity_counter, stream->continuity_counter);
1370     stream->state = PENDING_PACKET_DISCONT;
1371   }
1372   stream->continuity_counter = packet->continuity_counter;
1373
1374   if (stream->state == PENDING_PACKET_EMPTY) {
1375     if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
1376       stream->state = PENDING_PACKET_DISCONT;
1377       GST_WARNING ("Didn't get the first packet of this PES");
1378     } else {
1379       GST_LOG ("EMPTY=>HEADER");
1380       stream->state = PENDING_PACKET_HEADER;
1381     }
1382   }
1383
1384   switch (stream->state) {
1385     case PENDING_PACKET_HEADER:
1386     {
1387       GST_LOG ("HEADER: Parsing PES header");
1388
1389       /* parse the header */
1390       gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset);
1391       break;
1392     }
1393     case PENDING_PACKET_BUFFER:
1394     {
1395       GST_LOG ("BUFFER: appending data");
1396       if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) {
1397         GST_LOG ("resizing buffer");
1398         stream->allocated_size = stream->allocated_size * 2;
1399         stream->data = g_realloc (stream->data, stream->allocated_size);
1400       }
1401       memcpy (stream->data + stream->current_size, data, size);
1402       stream->current_size += size;
1403       break;
1404     }
1405     case PENDING_PACKET_DISCONT:
1406     {
1407       GST_LOG ("DISCONT: not storing/pushing");
1408       if (G_UNLIKELY (stream->data)) {
1409         g_free (stream->data);
1410         stream->data = NULL;
1411       }
1412       stream->continuity_counter = CONTINUITY_UNSET;
1413       break;
1414     }
1415     default:
1416       break;
1417   }
1418
1419   return;
1420 }
1421
1422 static void
1423 calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
1424 {
1425   MpegTSBase *base = (MpegTSBase *) demux;
1426   GstClockTime lowest_pts = GST_CLOCK_TIME_NONE;
1427   GstClockTime firstts = 0;
1428   GList *tmp;
1429
1430   GST_DEBUG ("Creating new newsegment for stream %p", stream);
1431
1432   /* 0) If we don't have a time segment yet try to recover segment info from
1433    *    base when it's in time otherwise just initialize segment with
1434    *    defaults.
1435    *    It will happen only if it's first program or after flushes. */
1436   if (demux->segment.format == GST_FORMAT_UNDEFINED) {
1437     if (base->segment.format == GST_FORMAT_TIME) {
1438       demux->segment = base->segment;
1439       /* We can shortcut and create the segment event directly */
1440       demux->segment_event = gst_event_new_segment (&demux->segment);
1441     } else {
1442       gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1443     }
1444   }
1445
1446   /* 1) If we need to calculate an update newsegment, do it
1447    * 2) If we need to calculate a new newsegment, do it
1448    * 3) If an update_segment is valid, push it
1449    * 4) If a newsegment is valid, push it */
1450
1451   /* Speedup : if we don't need to calculate anything, go straight to pushing */
1452   if (!demux->calculate_update_segment && demux->segment_event)
1453     goto push_new_segment;
1454
1455   /* Calculate the 'new_start' value, used for both updates and newsegment */
1456   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1457     TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
1458
1459     if (GST_CLOCK_TIME_IS_VALID (pstream->pts)) {
1460       if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->pts < lowest_pts)
1461         lowest_pts = pstream->pts;
1462     }
1463     if (GST_CLOCK_TIME_IS_VALID (pstream->dts)) {
1464       if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->dts < lowest_pts)
1465         lowest_pts = pstream->dts;
1466     }
1467   }
1468   if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
1469     firstts =
1470         mpegts_packetizer_pts_to_ts (base->packetizer, lowest_pts,
1471         demux->program->pcr_pid);
1472   GST_DEBUG ("lowest_pts %" G_GUINT64_FORMAT " => clocktime %" GST_TIME_FORMAT,
1473       lowest_pts, GST_TIME_ARGS (firstts));
1474
1475   if (demux->calculate_update_segment) {
1476     GST_DEBUG ("Calculating update segment");
1477     /* If we have a valid segment, create an update of that */
1478     if (demux->segment.format == GST_FORMAT_TIME) {
1479       GstSegment update_segment;
1480       GST_DEBUG ("Re-using segment " SEGMENT_FORMAT,
1481           SEGMENT_ARGS (demux->segment));
1482       gst_segment_copy_into (&demux->segment, &update_segment);
1483       update_segment.stop = firstts;
1484       demux->update_segment = gst_event_new_segment (&update_segment);
1485     }
1486     demux->calculate_update_segment = FALSE;
1487   }
1488
1489   if (!demux->segment_event) {
1490     GstSegment new_segment;
1491
1492     GST_DEBUG ("Calculating actual segment");
1493
1494     gst_segment_copy_into (&demux->segment, &new_segment);
1495     if (new_segment.format != GST_FORMAT_TIME) {
1496       /* Start from the first ts/pts */
1497       new_segment.start = firstts;
1498       new_segment.stop = GST_CLOCK_TIME_NONE;
1499       new_segment.position = firstts;
1500     }
1501
1502     demux->segment_event = gst_event_new_segment (&new_segment);
1503   }
1504
1505 push_new_segment:
1506   if (demux->update_segment) {
1507     GST_DEBUG_OBJECT (stream->pad, "Pushing update segment");
1508     gst_event_ref (demux->update_segment);
1509     gst_pad_push_event (stream->pad, demux->update_segment);
1510   }
1511
1512   if (demux->segment_event) {
1513     GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
1514     gst_event_ref (demux->segment_event);
1515     gst_pad_push_event (stream->pad, demux->segment_event);
1516   }
1517
1518   /* Push pending tags */
1519   if (stream->taglist) {
1520     GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT,
1521         stream->taglist);
1522     gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist));
1523     stream->taglist = NULL;
1524   }
1525
1526   stream->need_newsegment = FALSE;
1527 }
1528
1529 static GstFlowReturn
1530 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
1531 {
1532   GstFlowReturn res = GST_FLOW_OK;
1533   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1534   GstBuffer *buffer = NULL;
1535   MpegTSPacketizer2 *packetizer = MPEG_TS_BASE_PACKETIZER (demux);
1536
1537   GST_DEBUG_OBJECT (stream->pad,
1538       "stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid,
1539       bs->stream_type, stream->state);
1540
1541   if (G_UNLIKELY (stream->data == NULL)) {
1542     GST_LOG ("stream->data == NULL");
1543     goto beach;
1544   }
1545
1546   if (G_UNLIKELY (stream->state == PENDING_PACKET_EMPTY)) {
1547     GST_LOG ("EMPTY: returning");
1548     goto beach;
1549   }
1550
1551   if (G_UNLIKELY (stream->state != PENDING_PACKET_BUFFER)) {
1552     GST_LOG ("state:%d, returning", stream->state);
1553     goto beach;
1554   }
1555
1556   if (G_UNLIKELY (!stream->active))
1557     activate_pad_for_stream (demux, stream);
1558
1559   if (G_UNLIKELY (stream->pad == NULL)) {
1560     g_free (stream->data);
1561     goto beach;
1562   }
1563
1564   if (G_UNLIKELY (demux->program == NULL)) {
1565     GST_LOG_OBJECT (demux, "No program");
1566     g_free (stream->data);
1567     goto beach;
1568   }
1569
1570   if (G_UNLIKELY (stream->need_newsegment))
1571     calculate_and_push_newsegment (demux, stream);
1572
1573   buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
1574
1575   GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
1576       GST_TIME_ARGS (stream->pts));
1577   if (GST_CLOCK_TIME_IS_VALID (stream->pts))
1578     GST_BUFFER_PTS (buffer) =
1579         mpegts_packetizer_pts_to_ts (packetizer, stream->pts,
1580         demux->program->pcr_pid);
1581   if (GST_CLOCK_TIME_IS_VALID (stream->dts))
1582     GST_BUFFER_DTS (buffer) =
1583         mpegts_packetizer_pts_to_ts (packetizer, stream->dts,
1584         demux->program->pcr_pid);
1585
1586   GST_DEBUG_OBJECT (stream->pad,
1587       "Pushing buffer with PTS: %" GST_TIME_FORMAT " , DTS: %" GST_TIME_FORMAT,
1588       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
1589       GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
1590
1591   res = gst_pad_push (stream->pad, buffer);
1592   GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
1593   res = tsdemux_combine_flows (demux, stream, res);
1594   GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
1595
1596 beach:
1597   /* Reset everything */
1598   GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
1599   stream->state = PENDING_PACKET_EMPTY;
1600   stream->data = NULL;
1601   stream->expected_size = 0;
1602   stream->current_size = 0;
1603
1604   return res;
1605 }
1606
1607 static GstFlowReturn
1608 gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
1609     MpegTSPacketizerPacket * packet, MpegTSPacketizerSection * section)
1610 {
1611   GstFlowReturn res = GST_FLOW_OK;
1612
1613   GST_DEBUG ("data:%p", packet->data);
1614   GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", packet->pid,
1615       packet->payload_unit_start_indicator, packet->adaptation_field_control,
1616       packet->continuity_counter, packet->payload);
1617
1618   if (section) {
1619     GST_DEBUG ("section complete:%d, buffer size %d",
1620         section->complete, section->section_length);
1621     return res;
1622   }
1623
1624   if (G_UNLIKELY (packet->payload_unit_start_indicator) &&
1625       packet->adaptation_field_control & 0x1)
1626     /* Flush previous data */
1627     res = gst_ts_demux_push_pending_data (demux, stream);
1628
1629   if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)
1630       && stream->pad) {
1631     gst_ts_demux_queue_data (demux, stream, packet);
1632     GST_DEBUG ("current_size:%d, expected_size:%d",
1633         stream->current_size, stream->expected_size);
1634     /* Finally check if the data we queued completes a packet */
1635     if (stream->expected_size && stream->current_size == stream->expected_size) {
1636       GST_LOG ("pushing complete packet");
1637       res = gst_ts_demux_push_pending_data (demux, stream);
1638     }
1639   }
1640
1641   return res;
1642 }
1643
1644 static void
1645 gst_ts_demux_flush (MpegTSBase * base)
1646 {
1647   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
1648
1649   gst_ts_demux_flush_streams (demux);
1650
1651   if (demux->segment_event) {
1652     gst_event_unref (demux->segment_event);
1653     demux->segment_event = NULL;
1654   }
1655   demux->calculate_update_segment = FALSE;
1656   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
1657 }
1658
1659 static GstFlowReturn
1660 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
1661     MpegTSPacketizerSection * section)
1662 {
1663   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
1664   TSDemuxStream *stream = NULL;
1665   GstFlowReturn res = GST_FLOW_OK;
1666
1667   if (G_LIKELY (demux->program)) {
1668     stream = (TSDemuxStream *) demux->program->streams[packet->pid];
1669
1670     if (stream) {
1671       res = gst_ts_demux_handle_packet (demux, stream, packet, section);
1672     }
1673   }
1674   return res;
1675 }
1676
1677 gboolean
1678 gst_ts_demux_plugin_init (GstPlugin * plugin)
1679 {
1680   GST_DEBUG_CATEGORY_INIT (ts_demux_debug, "tsdemux", 0,
1681       "MPEG transport stream demuxer");
1682   init_pes_parser ();
1683
1684   return gst_element_register (plugin, "tsdemux",
1685       GST_RANK_PRIMARY, GST_TYPE_TS_DEMUX);
1686 }