03a3fbabb37d2e8445eb8fa57e458a56a9a54c69
[platform/upstream/gstreamer.git] / gst / mpegtsdemux / tsdemux.c
1 /*
2  * tsdemux.c
3  * Copyright (C) 2009 Zaheer Abbas Merali
4  *               2010 Edward Hervey
5  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
6  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
7  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
8  *  Author: Edward Hervey <bilboed@bilboed.com>, Collabora Ltd.
9  *
10  * Authors:
11  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
12  *   Edward Hervey <edward.hervey@collabora.co.uk>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #include <stdlib.h>
35 #include <string.h>
36
37 #include <glib.h>
38 #include <gst/tag/tag.h>
39
40 #include "mpegtsbase.h"
41 #include "tsdemux.h"
42 #include "gstmpegdesc.h"
43 #include "gstmpegdefs.h"
44 #include "mpegtspacketizer.h"
45 #include "pesparse.h"
46
47 /*
48  * tsdemux
49  *
50  * See TODO for explanations on improvements needed
51  */
52
53 /* latency in mseconds */
54 #define TS_LATENCY 700
55
56 #define TABLE_ID_UNSET 0xFF
57
58 #define CONTINUITY_UNSET 255
59 #define MAX_CONTINUITY 15
60
61 #define PCR_WRAP_SIZE_128KBPS (((gint64)1490)*(1024*1024))
62 /* small PCR for wrap detection */
63 #define PCR_SMALL 17775000
64 /* maximal PCR time */
65 #define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298)
66 #define PTS_DTS_MAX_VALUE (((guint64)1) << 33)
67
68 /* Seeking/Scanning related variables */
69
70 /* seek to SEEK_TIMESTAMP_OFFSET before the desired offset and search then
71  * either accurately or for the next timestamp
72  */
73 #define SEEK_TIMESTAMP_OFFSET (500 * GST_MSECOND)
74
75 #define SEGMENT_FORMAT "[format:%s, rate:%f, start:%"                   \
76   GST_TIME_FORMAT", stop:%"GST_TIME_FORMAT", time:%"GST_TIME_FORMAT     \
77   ", base:%"GST_TIME_FORMAT", position:%"GST_TIME_FORMAT                \
78   ", duration:%"GST_TIME_FORMAT"]"
79
80 #define SEGMENT_ARGS(a) gst_format_get_name((a).format), (a).rate,      \
81     GST_TIME_ARGS((a).start), GST_TIME_ARGS((a).stop),                  \
82     GST_TIME_ARGS((a).time), GST_TIME_ARGS((a).base),                   \
83     GST_TIME_ARGS((a).position), GST_TIME_ARGS((a).duration)
84
85
86 GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
87 #define GST_CAT_DEFAULT ts_demux_debug
88
89 #define ABSDIFF(a,b) (((a) > (b)) ? ((a) - (b)) : ((b) - (a)))
90
91 static GQuark QUARK_TSDEMUX;
92 static GQuark QUARK_PID;
93 static GQuark QUARK_PCR;
94 static GQuark QUARK_OPCR;
95 static GQuark QUARK_PTS;
96 static GQuark QUARK_DTS;
97 static GQuark QUARK_OFFSET;
98
99 typedef enum
100 {
101   PENDING_PACKET_EMPTY = 0,     /* No pending packet/buffer
102                                  * Push incoming buffers to the array */
103   PENDING_PACKET_HEADER,        /* PES header needs to be parsed
104                                  * Push incoming buffers to the array */
105   PENDING_PACKET_BUFFER,        /* Currently filling up output buffer
106                                  * Push incoming buffers to the bufferlist */
107   PENDING_PACKET_DISCONT        /* Discontinuity in incoming packets
108                                  * Drop all incoming buffers */
109 } PendingPacketState;
110
111 typedef struct _TSDemuxStream TSDemuxStream;
112
113 struct _TSDemuxStream
114 {
115   MpegTSBaseStream stream;
116
117   GstPad *pad;
118   /* Whether the pad was added or not */
119   gboolean active;
120
121   /* the return of the latest push */
122   GstFlowReturn flow_return;
123
124   /* Output data */
125   PendingPacketState state;
126
127   /* Data to push (allocated) */
128   guint8 *data;
129
130   /* Size of data to push (if known) */
131   guint expected_size;
132
133   /* Size of currently queued data */
134   guint current_size;
135   guint allocated_size;
136
137   /* Current PTS/DTS for this stream */
138   GstClockTime pts;
139   GstClockTime dts;
140   /* Raw value of current PTS/DTS */
141   guint64 raw_pts;
142   guint64 raw_dts;
143   /* PTS/DTS with rollover fixed */
144   guint64 fixed_pts;
145   guint64 fixed_dts;
146   /* Number of rollover seen for PTS/DTS (default:0) */
147   guint nb_pts_rollover;
148   guint nb_dts_rollover;
149
150   /* Whether this stream needs to send a newsegment */
151   gboolean need_newsegment;
152
153   GstTagList *taglist;
154
155   gint continuity_counter;
156 };
157
158 #define VIDEO_CAPS \
159   GST_STATIC_CAPS (\
160     "video/mpeg, " \
161       "mpegversion = (int) { 1, 2, 4 }, " \
162       "systemstream = (boolean) FALSE; " \
163     "video/x-h264,stream-format=(string)byte-stream," \
164       "alignment=(string)nal;" \
165     "video/x-dirac;" \
166     "video/x-wmv," \
167       "wmvversion = (int) 3, " \
168       "format = (string) WVC1" \
169   )
170
171 #define AUDIO_CAPS \
172   GST_STATIC_CAPS ( \
173     "audio/mpeg, " \
174       "mpegversion = (int) 1;" \
175     "audio/mpeg, " \
176       "mpegversion = (int) 2, " \
177       "stream-format = (string) adts; " \
178     "audio/mpeg, " \
179       "mpegversion = (int) 4, " \
180       "stream-format = (string) loas; " \
181     "audio/x-lpcm, " \
182       "width = (int) { 16, 20, 24 }, " \
183       "rate = (int) { 48000, 96000 }, " \
184       "channels = (int) [ 1, 8 ], " \
185       "dynamic_range = (int) [ 0, 255 ], " \
186       "emphasis = (boolean) { FALSE, TRUE }, " \
187       "mute = (boolean) { FALSE, TRUE }; " \
188     "audio/x-ac3; audio/x-eac3;" \
189     "audio/x-dts;" \
190     "audio/x-private-ts-lpcm" \
191   )
192
193 /* Can also use the subpicture pads for text subtitles? */
194 #define SUBPICTURE_CAPS \
195     GST_STATIC_CAPS ("subpicture/x-pgs; subpicture/x-dvd")
196
197 static GstStaticPadTemplate video_template =
198 GST_STATIC_PAD_TEMPLATE ("video_%04x", GST_PAD_SRC,
199     GST_PAD_SOMETIMES,
200     VIDEO_CAPS);
201
202 static GstStaticPadTemplate audio_template =
203 GST_STATIC_PAD_TEMPLATE ("audio_%04x",
204     GST_PAD_SRC,
205     GST_PAD_SOMETIMES,
206     AUDIO_CAPS);
207
208 static GstStaticPadTemplate subpicture_template =
209 GST_STATIC_PAD_TEMPLATE ("subpicture_%04x",
210     GST_PAD_SRC,
211     GST_PAD_SOMETIMES,
212     SUBPICTURE_CAPS);
213
214 static GstStaticPadTemplate private_template =
215 GST_STATIC_PAD_TEMPLATE ("private_%04x",
216     GST_PAD_SRC,
217     GST_PAD_SOMETIMES,
218     GST_STATIC_CAPS_ANY);
219
220 enum
221 {
222   ARG_0,
223   PROP_PROGRAM_NUMBER,
224   PROP_EMIT_STATS,
225   /* FILL ME */
226 };
227
228 /* Pad functions */
229
230
231 /* mpegtsbase methods */
232 static void
233 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
234 static void
235 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
236 static void gst_ts_demux_reset (MpegTSBase * base);
237 static GstFlowReturn
238 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
239     MpegTSPacketizerSection * section);
240 static void gst_ts_demux_flush (MpegTSBase * base);
241 static void
242 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream,
243     MpegTSBaseProgram * program);
244 static void
245 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream);
246 static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event);
247 static void gst_ts_demux_set_property (GObject * object, guint prop_id,
248     const GValue * value, GParamSpec * pspec);
249 static void gst_ts_demux_get_property (GObject * object, guint prop_id,
250     GValue * value, GParamSpec * pspec);
251 static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux);
252 static GstFlowReturn
253 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream);
254 static void gst_ts_demux_stream_flush (TSDemuxStream * stream);
255
256 static gboolean push_event (MpegTSBase * base, GstEvent * event);
257
258 static void
259 _extra_init (void)
260 {
261   QUARK_TSDEMUX = g_quark_from_string ("tsdemux");
262   QUARK_PID = g_quark_from_string ("pid");
263   QUARK_PCR = g_quark_from_string ("pcr");
264   QUARK_OPCR = g_quark_from_string ("opcr");
265   QUARK_PTS = g_quark_from_string ("pts");
266   QUARK_DTS = g_quark_from_string ("dts");
267   QUARK_OFFSET = g_quark_from_string ("offset");
268 }
269
270 #define gst_ts_demux_parent_class parent_class
271 G_DEFINE_TYPE_WITH_CODE (GstTSDemux, gst_ts_demux, GST_TYPE_MPEGTS_BASE,
272     _extra_init ());
273
274 static void
275 gst_ts_demux_class_init (GstTSDemuxClass * klass)
276 {
277   GObjectClass *gobject_class;
278   GstElementClass *element_class;
279   MpegTSBaseClass *ts_class;
280
281   gobject_class = G_OBJECT_CLASS (klass);
282   gobject_class->set_property = gst_ts_demux_set_property;
283   gobject_class->get_property = gst_ts_demux_get_property;
284
285   g_object_class_install_property (gobject_class, PROP_PROGRAM_NUMBER,
286       g_param_spec_int ("program-number", "Program number",
287           "Program Number to demux for (-1 to ignore)", -1, G_MAXINT,
288           -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
289
290   g_object_class_install_property (gobject_class, PROP_EMIT_STATS,
291       g_param_spec_boolean ("emit-stats", "Emit statistics",
292           "Emit messages for every pcr/opcr/pts/dts", FALSE,
293           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
294
295   element_class = GST_ELEMENT_CLASS (klass);
296   gst_element_class_add_pad_template (element_class,
297       gst_static_pad_template_get (&video_template));
298   gst_element_class_add_pad_template (element_class,
299       gst_static_pad_template_get (&audio_template));
300   gst_element_class_add_pad_template (element_class,
301       gst_static_pad_template_get (&subpicture_template));
302   gst_element_class_add_pad_template (element_class,
303       gst_static_pad_template_get (&private_template));
304
305   gst_element_class_set_static_metadata (element_class,
306       "MPEG transport stream demuxer",
307       "Codec/Demuxer",
308       "Demuxes MPEG2 transport streams",
309       "Zaheer Abbas Merali <zaheerabbas at merali dot org>\n"
310       "Edward Hervey <edward.hervey@collabora.co.uk>");
311
312   ts_class = GST_MPEGTS_BASE_CLASS (klass);
313   ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset);
314   ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push);
315   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
316   ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started);
317   ts_class->program_stopped = GST_DEBUG_FUNCPTR (gst_ts_demux_program_stopped);
318   ts_class->stream_added = gst_ts_demux_stream_added;
319   ts_class->stream_removed = gst_ts_demux_stream_removed;
320   ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek);
321   ts_class->flush = GST_DEBUG_FUNCPTR (gst_ts_demux_flush);
322 }
323
324 static void
325 gst_ts_demux_reset (MpegTSBase * base)
326 {
327   GstTSDemux *demux = (GstTSDemux *) base;
328
329   demux->program_number = -1;
330   demux->calculate_update_segment = FALSE;
331
332   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
333   if (demux->segment_event) {
334     gst_event_unref (demux->segment_event);
335     demux->segment_event = NULL;
336   }
337
338   if (demux->update_segment) {
339     gst_event_unref (demux->update_segment);
340     demux->update_segment = NULL;
341   }
342 }
343
344 static void
345 gst_ts_demux_init (GstTSDemux * demux)
346 {
347   GST_MPEGTS_BASE (demux)->stream_size = sizeof (TSDemuxStream);
348
349   gst_ts_demux_reset ((MpegTSBase *) demux);
350 }
351
352
353 static void
354 gst_ts_demux_set_property (GObject * object, guint prop_id,
355     const GValue * value, GParamSpec * pspec)
356 {
357   GstTSDemux *demux = GST_TS_DEMUX (object);
358
359   switch (prop_id) {
360     case PROP_PROGRAM_NUMBER:
361       /* FIXME: do something if program is switched as opposed to set at
362        * beginning */
363       demux->program_number = g_value_get_int (value);
364       break;
365     case PROP_EMIT_STATS:
366       demux->emit_statistics = g_value_get_boolean (value);
367       break;
368     default:
369       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
370   }
371 }
372
373 static void
374 gst_ts_demux_get_property (GObject * object, guint prop_id,
375     GValue * value, GParamSpec * pspec)
376 {
377   GstTSDemux *demux = GST_TS_DEMUX (object);
378
379   switch (prop_id) {
380     case PROP_PROGRAM_NUMBER:
381       g_value_set_int (value, demux->program_number);
382       break;
383     case PROP_EMIT_STATS:
384       g_value_set_boolean (value, demux->emit_statistics);
385       break;
386     default:
387       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
388   }
389 }
390
391 static gboolean
392 gst_ts_demux_srcpad_query (GstPad * pad, GstObject * parent, GstQuery * query)
393 {
394   gboolean res = TRUE;
395   GstFormat format;
396   GstTSDemux *demux;
397   MpegTSBase *base;
398
399   demux = GST_TS_DEMUX (parent);
400   base = GST_MPEGTS_BASE (demux);
401
402   switch (GST_QUERY_TYPE (query)) {
403     case GST_QUERY_DURATION:
404     {
405       GST_DEBUG ("query duration");
406       gst_query_parse_duration (query, &format, NULL);
407       if (format == GST_FORMAT_TIME) {
408         if (!gst_pad_peer_query (base->sinkpad, query)) {
409           gint64 val;
410
411           format = GST_FORMAT_BYTES;
412           if (!gst_pad_peer_query_duration (base->sinkpad, format, &val))
413             res = FALSE;
414           else {
415             GstClockTime dur =
416                 mpegts_packetizer_offset_to_ts (base->packetizer, val,
417                 demux->program->pcr_pid);
418             if (GST_CLOCK_TIME_IS_VALID (dur))
419               gst_query_set_duration (query, GST_FORMAT_TIME, dur);
420             else
421               res = FALSE;
422           }
423         }
424       } else {
425         GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
426         res = FALSE;
427       }
428       break;
429     }
430     case GST_QUERY_LATENCY:
431     {
432       GST_DEBUG ("query latency");
433       res = gst_pad_peer_query (base->sinkpad, query);
434       if (res && base->upstream_live) {
435         GstClockTime min_lat, max_lat;
436         gboolean live;
437
438         /* According to H.222.0
439            Annex D.0.3 (System Time Clock recovery in the decoder)
440            and D.0.2 (Audio and video presentation synchronization)
441
442            We can end up with an interval of up to 700ms between valid
443            PCR/SCR. We therefore allow a latency of 700ms for that.
444          */
445         gst_query_parse_latency (query, &live, &min_lat, &max_lat);
446         if (min_lat != -1)
447           min_lat += 700 * GST_MSECOND;
448         if (max_lat != -1)
449           max_lat += 700 * GST_MSECOND;
450         gst_query_set_latency (query, live, min_lat, max_lat);
451       }
452       break;
453     }
454     case GST_QUERY_SEEKING:
455     {
456       GST_DEBUG ("query seeking");
457       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
458       if (format == GST_FORMAT_TIME) {
459         gboolean seekable = FALSE;
460
461         if (gst_pad_peer_query (base->sinkpad, query))
462           gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
463
464         /* If upstream is not seekable in TIME format we use
465          * our own values here */
466         if (!seekable)
467           gst_query_set_seeking (query, GST_FORMAT_TIME,
468               demux->parent.mode != BASE_MODE_PUSHING, 0,
469               demux->segment.duration);
470       } else {
471         GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking");
472         res = FALSE;
473       }
474       break;
475     }
476     default:
477       res = gst_pad_query_default (pad, parent, query);
478   }
479
480   return res;
481
482 }
483
484 static GstFlowReturn
485 gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
486 {
487   GstTSDemux *demux = (GstTSDemux *) base;
488   GstFlowReturn res = GST_FLOW_ERROR;
489   gdouble rate;
490   GstFormat format;
491   GstSeekFlags flags;
492   GstSeekType start_type, stop_type;
493   gint64 start, stop;
494   GstSegment seeksegment;
495   gboolean update;
496   guint64 start_offset;
497
498   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
499       &stop_type, &stop);
500
501   if (format != GST_FORMAT_TIME) {
502     goto done;
503   }
504
505   GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
506       " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
507       GST_TIME_ARGS (stop));
508
509   if (flags & (GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_SKIP)) {
510     GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
511     goto done;
512   }
513
514   /* copy segment, we need this because we still need the old
515    * segment when we close the current segment. */
516   memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
517   if (demux->segment_event) {
518     gst_event_unref (demux->segment_event);
519     demux->segment_event = NULL;
520   }
521   /* configure the segment with the seek variables */
522   GST_DEBUG_OBJECT (demux, "configuring seek");
523   GST_DEBUG ("seeksegment before set_seek " SEGMENT_FORMAT,
524       SEGMENT_ARGS (seeksegment));
525
526   gst_segment_do_seek (&seeksegment, rate, format, flags, start_type, start,
527       stop_type, stop, &update);
528
529   GST_DEBUG ("seeksegment after set_seek " SEGMENT_FORMAT,
530       SEGMENT_ARGS (seeksegment));
531
532   /* Convert start/stop to offset */
533   start_offset =
534       mpegts_packetizer_ts_to_offset (base->packetizer, MAX (0,
535           start - SEEK_TIMESTAMP_OFFSET), demux->program->pcr_pid);
536
537   if (G_UNLIKELY (start_offset == -1)) {
538     GST_WARNING ("Couldn't convert start position to an offset");
539     goto done;
540   }
541
542   /* record offset */
543   base->seek_offset = start_offset;
544   res = GST_FLOW_OK;
545
546   /* commit the new segment */
547   memcpy (&demux->segment, &seeksegment, sizeof (GstSegment));
548
549   if (demux->segment.flags & GST_SEEK_FLAG_SEGMENT) {
550     gst_element_post_message (GST_ELEMENT_CAST (demux),
551         gst_message_new_segment_start (GST_OBJECT_CAST (demux),
552             demux->segment.format, demux->segment.stop));
553   }
554
555 done:
556   return res;
557 }
558
559 static gboolean
560 gst_ts_demux_srcpad_event (GstPad * pad, GstObject * parent, GstEvent * event)
561 {
562   gboolean res = TRUE;
563   GstTSDemux *demux = GST_TS_DEMUX (parent);
564
565   GST_DEBUG_OBJECT (pad, "Got event %s",
566       gst_event_type_get_name (GST_EVENT_TYPE (event)));
567
568   switch (GST_EVENT_TYPE (event)) {
569     case GST_EVENT_SEEK:
570       res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event);
571       if (!res)
572         GST_WARNING ("seeking failed");
573       gst_event_unref (event);
574       break;
575     default:
576       res = gst_pad_event_default (pad, parent, event);
577   }
578
579   return res;
580 }
581
582 static gboolean
583 push_event (MpegTSBase * base, GstEvent * event)
584 {
585   GstTSDemux *demux = (GstTSDemux *) base;
586   GList *tmp;
587
588   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
589     GST_DEBUG_OBJECT (base, "Ignoring segment event (recreated later)");
590     gst_event_unref (event);
591     return TRUE;
592   }
593
594   if (G_UNLIKELY (demux->program == NULL)) {
595     gst_event_unref (event);
596     return FALSE;
597   }
598
599   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
600     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
601     if (stream->pad) {
602       gst_event_ref (event);
603       gst_pad_push_event (stream->pad, event);
604     }
605   }
606
607   gst_event_unref (event);
608
609   return TRUE;
610 }
611
612 static GstFlowReturn
613 tsdemux_combine_flows (GstTSDemux * demux, TSDemuxStream * stream,
614     GstFlowReturn ret)
615 {
616   GList *tmp;
617
618   /* Store the value */
619   stream->flow_return = ret;
620
621   /* any other error that is not-linked can be returned right away */
622   if (ret != GST_FLOW_NOT_LINKED)
623     goto done;
624
625   /* Only return NOT_LINKED if all other pads returned NOT_LINKED */
626   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
627     stream = (TSDemuxStream *) tmp->data;
628     if (stream->pad) {
629       ret = stream->flow_return;
630       /* some other return value (must be SUCCESS but we can return
631        * other values as well) */
632       if (ret != GST_FLOW_NOT_LINKED)
633         goto done;
634     }
635     /* if we get here, all other pads were unlinked and we return
636      * NOT_LINKED then */
637   }
638
639 done:
640   return ret;
641 }
642
643 static void
644 gst_ts_demux_create_tags (TSDemuxStream * stream)
645 {
646   guint8 *desc = NULL;
647   int i;
648
649   desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
650       DESC_ISO_639_LANGUAGE);
651
652   if (!desc) {
653     desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
654         DESC_DVB_SUBTITLING);
655   }
656
657   if (desc) {
658     if (!stream->taglist)
659       stream->taglist = gst_tag_list_new_empty ();
660
661     for (i = 0; i < DESC_ISO_639_LANGUAGE_codes_n (desc); i++) {
662       const gchar *lc;
663       gchar lang_code[4];
664       gchar *language_n;
665
666       language_n = (gchar *)
667           DESC_ISO_639_LANGUAGE_language_code_nth (desc, i);
668
669       /* Language codes should be 3 character long, we allow
670        * a bit more flexibility by allowing 2 characters. */
671       if (!language_n[0] || !language_n[1])
672         continue;
673
674       GST_LOG ("Add language code for stream: %s", language_n);
675
676       lang_code[0] = language_n[0];
677       lang_code[1] = language_n[1];
678       lang_code[2] = language_n[2];
679       lang_code[3] = 0;
680
681       /* descriptor contains ISO 639-2 code, we want the ISO 639-1 code */
682       lc = gst_tag_get_language_code (lang_code);
683       gst_tag_list_add (stream->taglist, GST_TAG_MERGE_REPLACE,
684           GST_TAG_LANGUAGE_CODE, (lc) ? lc : lang_code, NULL);
685     }
686
687     g_free (desc);
688   }
689 }
690
691 static GstPad *
692 create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
693     MpegTSBaseProgram * program)
694 {
695   TSDemuxStream *stream = (TSDemuxStream *) bstream;
696   gchar *name = NULL;
697   GstCaps *caps = NULL;
698   GstPadTemplate *template = NULL;
699   guint8 *desc = NULL;
700   GstPad *pad = NULL;
701
702   gst_ts_demux_create_tags (stream);
703
704   GST_LOG ("Attempting to create pad for stream 0x%04x with stream_type %d",
705       bstream->pid, bstream->stream_type);
706
707   switch (bstream->stream_type) {
708     case ST_VIDEO_MPEG1:
709     case ST_VIDEO_MPEG2:
710       GST_LOG ("mpeg video");
711       template = gst_static_pad_template_get (&video_template);
712       name = g_strdup_printf ("video_%04x", bstream->pid);
713       caps = gst_caps_new_simple ("video/mpeg",
714           "mpegversion", G_TYPE_INT,
715           bstream->stream_type == ST_VIDEO_MPEG1 ? 1 : 2, "systemstream",
716           G_TYPE_BOOLEAN, FALSE, NULL);
717
718       break;
719     case ST_AUDIO_MPEG1:
720     case ST_AUDIO_MPEG2:
721       GST_LOG ("mpeg audio");
722       template = gst_static_pad_template_get (&audio_template);
723       name = g_strdup_printf ("audio_%04x", bstream->pid);
724       caps =
725           gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 1,
726           NULL);
727       break;
728     case ST_PRIVATE_DATA:
729       GST_LOG ("private data");
730       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
731           DESC_DVB_AC3);
732       if (desc) {
733         GST_LOG ("ac3 audio");
734         template = gst_static_pad_template_get (&audio_template);
735         name = g_strdup_printf ("audio_%04x", bstream->pid);
736         caps = gst_caps_new_empty_simple ("audio/x-ac3");
737         g_free (desc);
738         break;
739       }
740
741       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
742           DESC_DVB_ENHANCED_AC3);
743       if (desc) {
744         GST_LOG ("ac3 audio");
745         template = gst_static_pad_template_get (&audio_template);
746         name = g_strdup_printf ("audio_%04x", bstream->pid);
747         caps = gst_caps_new_empty_simple ("audio/x-eac3");
748         g_free (desc);
749         break;
750       }
751       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
752           DESC_DVB_TELETEXT);
753       if (desc) {
754         GST_LOG ("teletext");
755         template = gst_static_pad_template_get (&private_template);
756         name = g_strdup_printf ("private_%04x", bstream->pid);
757         caps = gst_caps_new_empty_simple ("private/teletext");
758         g_free (desc);
759         break;
760       }
761       desc =
762           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
763           DESC_DVB_SUBTITLING);
764       if (desc) {
765         GST_LOG ("subtitling");
766         template = gst_static_pad_template_get (&private_template);
767         name = g_strdup_printf ("private_%04x", bstream->pid);
768         caps = gst_caps_new_empty_simple ("subpicture/x-dvb");
769         g_free (desc);
770         break;
771       }
772
773       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
774           DESC_REGISTRATION);
775       if (desc) {
776         switch (DESC_REGISTRATION_format_identifier (desc)) {
777           case DRF_ID_DTS1:
778           case DRF_ID_DTS2:
779           case DRF_ID_DTS3:
780             /* SMPTE registered DTS */
781             GST_LOG ("subtitling");
782             template = gst_static_pad_template_get (&private_template);
783             name = g_strdup_printf ("private_%04x", bstream->pid);
784             caps = gst_caps_new_empty_simple ("audio/x-dts");
785             break;
786           case DRF_ID_S302M:
787             template = gst_static_pad_template_get (&audio_template);
788             name = g_strdup_printf ("audio_%04x", bstream->pid);
789             caps = gst_caps_new_empty_simple ("audio/x-smpte-302m");
790             break;
791         }
792         g_free (desc);
793       }
794       if (template)
795         break;
796
797       /* hack for itv hd (sid 10510, video pid 3401 */
798       if (program->program_number == 10510 && bstream->pid == 3401) {
799         template = gst_static_pad_template_get (&video_template);
800         name = g_strdup_printf ("video_%04x", bstream->pid);
801         caps = gst_caps_new_simple ("video/x-h264",
802             "stream-format", G_TYPE_STRING, "byte-stream",
803             "alignment", G_TYPE_STRING, "nal", NULL);
804       }
805       break;
806     case ST_HDV_AUX_V:
807       /* We don't expose those streams since they're only helper streams */
808       /* template = gst_static_pad_template_get (&private_template); */
809       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
810       /* caps = gst_caps_new_simple ("hdv/aux-v", NULL); */
811       break;
812     case ST_HDV_AUX_A:
813       /* We don't expose those streams since they're only helper streams */
814       /* template = gst_static_pad_template_get (&private_template); */
815       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
816       /* caps = gst_caps_new_simple ("hdv/aux-a", NULL); */
817       break;
818     case ST_PRIVATE_SECTIONS:
819     case ST_MHEG:
820     case ST_DSMCC:
821     case ST_DSMCC_A:
822     case ST_DSMCC_B:
823     case ST_DSMCC_C:
824     case ST_DSMCC_D:
825       MPEGTS_BIT_UNSET (base->is_pes, bstream->pid);
826       break;
827     case ST_AUDIO_AAC_ADTS:
828       template = gst_static_pad_template_get (&audio_template);
829       name = g_strdup_printf ("audio_%04x", bstream->pid);
830       caps = gst_caps_new_simple ("audio/mpeg",
831           "mpegversion", G_TYPE_INT, 2,
832           "stream-format", G_TYPE_STRING, "adts", NULL);
833       break;
834     case ST_AUDIO_AAC_LATM:
835       template = gst_static_pad_template_get (&audio_template);
836       name = g_strdup_printf ("audio_%04x", bstream->pid);
837       caps = gst_caps_new_simple ("audio/mpeg",
838           "mpegversion", G_TYPE_INT, 4,
839           "stream-format", G_TYPE_STRING, "loas", NULL);
840       break;
841     case ST_VIDEO_MPEG4:
842       template = gst_static_pad_template_get (&video_template);
843       name = g_strdup_printf ("video_%04x", bstream->pid);
844       caps = gst_caps_new_simple ("video/mpeg",
845           "mpegversion", G_TYPE_INT, 4,
846           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
847       break;
848     case ST_VIDEO_H264:
849       template = gst_static_pad_template_get (&video_template);
850       name = g_strdup_printf ("video_%04x", bstream->pid);
851       caps = gst_caps_new_simple ("video/x-h264",
852           "stream-format", G_TYPE_STRING, "byte-stream",
853           "alignment", G_TYPE_STRING, "nal", NULL);
854       break;
855     case ST_VIDEO_DIRAC:
856       desc =
857           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
858           DESC_REGISTRATION);
859       if (desc) {
860         if (DESC_LENGTH (desc) >= 4) {
861           if (DESC_REGISTRATION_format_identifier (desc) == 0x64726163) {
862             GST_LOG ("dirac");
863             /* dirac in hex */
864             template = gst_static_pad_template_get (&video_template);
865             name = g_strdup_printf ("video_%04x", bstream->pid);
866             caps = gst_caps_new_empty_simple ("video/x-dirac");
867           }
868         }
869         g_free (desc);
870       }
871       break;
872     case ST_PRIVATE_EA:        /* Try to detect a VC1 stream */
873     {
874       gboolean is_vc1 = FALSE;
875       desc =
876           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
877           DESC_REGISTRATION);
878       if (desc) {
879         if (DESC_LENGTH (desc) >= 4) {
880           if (DESC_REGISTRATION_format_identifier (desc) == DRF_ID_VC1) {
881             is_vc1 = TRUE;
882           }
883         }
884         g_free (desc);
885       }
886       if (!is_vc1) {
887         GST_WARNING ("0xea private stream type found but no descriptor "
888             "for VC1. Assuming plain VC1.");
889       }
890
891       template = gst_static_pad_template_get (&video_template);
892       name = g_strdup_printf ("video_%04x", bstream->pid);
893       caps = gst_caps_new_simple ("video/x-wmv",
894           "wmvversion", G_TYPE_INT, 3, "format", G_TYPE_STRING, "WVC1", NULL);
895
896       break;
897     }
898     case ST_BD_AUDIO_AC3:
899     {
900       /* REGISTRATION DRF_ID_HDMV */
901       desc = mpegts_get_descriptor_from_program (program, DESC_REGISTRATION);
902       if (desc) {
903         if (DESC_REGISTRATION_format_identifier (desc) == DRF_ID_HDMV) {
904           guint8 *ac3_desc;
905
906           /* ATSC ac3 audio descriptor */
907           ac3_desc =
908               mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
909               DESC_AC3_AUDIO_STREAM);
910           if (ac3_desc && DESC_AC_AUDIO_STREAM_bsid (ac3_desc) != 16) {
911             GST_LOG ("ac3 audio");
912             template = gst_static_pad_template_get (&audio_template);
913             name = g_strdup_printf ("audio_%04x", bstream->pid);
914             caps = gst_caps_new_empty_simple ("audio/x-ac3");
915
916             g_free (ac3_desc);
917           } else {
918             template = gst_static_pad_template_get (&audio_template);
919             name = g_strdup_printf ("audio_%04x", bstream->pid);
920             caps = gst_caps_new_empty_simple ("audio/x-eac3");
921           }
922
923         }
924
925         g_free (desc);
926       }
927       if (template)
928         break;
929
930
931       /* DVB_ENHANCED_AC3 */
932       desc = mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
933           DESC_DVB_ENHANCED_AC3);
934       if (desc) {
935         template = gst_static_pad_template_get (&audio_template);
936         name = g_strdup_printf ("audio_%04x", bstream->pid);
937         caps = gst_caps_new_empty_simple ("audio/x-eac3");
938         g_free (desc);
939         break;
940       }
941
942       /* DVB_AC3 */
943       desc =
944           mpegts_get_descriptor_from_stream ((MpegTSBaseStream *) stream,
945           DESC_DVB_AC3);
946       if (!desc)
947         GST_WARNING ("AC3 stream type found but no corresponding "
948             "descriptor to differentiate between AC3 and EAC3. "
949             "Assuming plain AC3.");
950       else
951         g_free (desc);
952       template = gst_static_pad_template_get (&audio_template);
953       name = g_strdup_printf ("audio_%04x", bstream->pid);
954       caps = gst_caps_new_empty_simple ("audio/x-ac3");
955       break;
956     }
957     case ST_BD_AUDIO_EAC3:
958       template = gst_static_pad_template_get (&audio_template);
959       name = g_strdup_printf ("audio_%04x", bstream->pid);
960       caps = gst_caps_new_empty_simple ("audio/x-eac3");
961       break;
962     case ST_BD_AUDIO_AC3_TRUE_HD:
963       template = gst_static_pad_template_get (&audio_template);
964       name = g_strdup_printf ("audio_%04x", bstream->pid);
965       caps = gst_caps_new_empty_simple ("audio/x-true-hd");
966       break;
967     case ST_PS_AUDIO_DTS:
968       template = gst_static_pad_template_get (&audio_template);
969       name = g_strdup_printf ("audio_%04x", bstream->pid);
970       caps = gst_caps_new_empty_simple ("audio/x-dts");
971       break;
972     case ST_PS_AUDIO_LPCM:
973       template = gst_static_pad_template_get (&audio_template);
974       name = g_strdup_printf ("audio_%04x", bstream->pid);
975       caps = gst_caps_new_empty_simple ("audio/x-lpcm");
976       break;
977     case ST_BD_AUDIO_LPCM:
978       template = gst_static_pad_template_get (&audio_template);
979       name = g_strdup_printf ("audio_%04x", bstream->pid);
980       caps = gst_caps_new_empty_simple ("audio/x-private-ts-lpcm");
981       break;
982     case ST_PS_DVD_SUBPICTURE:
983       template = gst_static_pad_template_get (&subpicture_template);
984       name = g_strdup_printf ("subpicture_%04x", bstream->pid);
985       caps = gst_caps_new_empty_simple ("subpicture/x-dvd");
986       break;
987     case ST_BD_PGS_SUBPICTURE:
988       template = gst_static_pad_template_get (&subpicture_template);
989       name = g_strdup_printf ("subpicture_%04x", bstream->pid);
990       caps = gst_caps_new_empty_simple ("subpicture/x-pgs");
991       break;
992     default:
993       GST_WARNING ("Non-media stream (stream_type:0x%x). Not creating pad",
994           bstream->stream_type);
995       break;
996   }
997   if (template && name && caps) {
998     gchar *stream_id;
999
1000     GST_LOG ("stream:%p creating pad with name %s and caps %s", stream, name,
1001         gst_caps_to_string (caps));
1002     pad = gst_pad_new_from_template (template, name);
1003     gst_pad_set_active (pad, TRUE);
1004     gst_pad_use_fixed_caps (pad);
1005     stream_id =
1006         gst_pad_create_stream_id_printf (pad, GST_ELEMENT_CAST (base), "%08x",
1007         bstream->pid);
1008     gst_pad_push_event (pad, gst_event_new_stream_start (stream_id));
1009     g_free (stream_id);
1010     gst_pad_set_caps (pad, caps);
1011     gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query);
1012     gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event);
1013   }
1014
1015   if (name)
1016     g_free (name);
1017   if (template)
1018     gst_object_unref (template);
1019   if (caps)
1020     gst_caps_unref (caps);
1021
1022   return pad;
1023 }
1024
1025 static void
1026 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
1027     MpegTSBaseProgram * program)
1028 {
1029   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1030
1031   if (!stream->pad) {
1032     /* Create the pad */
1033     if (bstream->stream_type != 0xff)
1034       stream->pad = create_pad_for_stream (base, bstream, program);
1035     stream->active = FALSE;
1036
1037     stream->need_newsegment = TRUE;
1038     stream->pts = GST_CLOCK_TIME_NONE;
1039     stream->dts = GST_CLOCK_TIME_NONE;
1040     stream->raw_pts = 0;
1041     stream->raw_dts = 0;
1042     stream->fixed_pts = 0;
1043     stream->fixed_dts = 0;
1044     stream->nb_pts_rollover = 0;
1045     stream->nb_dts_rollover = 0;
1046     stream->continuity_counter = CONTINUITY_UNSET;
1047   }
1048   stream->flow_return = GST_FLOW_OK;
1049 }
1050
1051 static void
1052 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
1053 {
1054   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1055
1056   if (stream->pad) {
1057     if (stream->active && gst_pad_is_active (stream->pad)) {
1058       /* Flush out all data */
1059       GST_DEBUG_OBJECT (stream->pad, "Flushing out pending data");
1060       gst_ts_demux_push_pending_data ((GstTSDemux *) base, stream);
1061
1062       GST_DEBUG_OBJECT (stream->pad, "Pushing out EOS");
1063       gst_pad_push_event (stream->pad, gst_event_new_eos ());
1064       GST_DEBUG_OBJECT (stream->pad, "Deactivating and removing pad");
1065       gst_pad_set_active (stream->pad, FALSE);
1066       gst_element_remove_pad (GST_ELEMENT_CAST (base), stream->pad);
1067       stream->active = FALSE;
1068     }
1069     stream->pad = NULL;
1070   }
1071   gst_ts_demux_stream_flush (stream);
1072   stream->flow_return = GST_FLOW_NOT_LINKED;
1073 }
1074
1075 static void
1076 activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
1077 {
1078   GList *tmp;
1079   gboolean alldone = TRUE;
1080
1081   if (stream->pad) {
1082     GST_DEBUG_OBJECT (tsdemux, "Activating pad %s:%s for stream %p",
1083         GST_DEBUG_PAD_NAME (stream->pad), stream);
1084     gst_element_add_pad ((GstElement *) tsdemux, stream->pad);
1085     stream->active = TRUE;
1086     GST_DEBUG_OBJECT (stream->pad, "done adding pad");
1087
1088     /* Check if all pads were activated, and if so emit no-more-pads */
1089     for (tmp = tsdemux->program->stream_list; tmp; tmp = tmp->next) {
1090       stream = (TSDemuxStream *) tmp->data;
1091       if (stream->pad && !stream->active)
1092         alldone = FALSE;
1093     }
1094     if (alldone) {
1095       GST_DEBUG_OBJECT (tsdemux, "All pads were activated, emit no-more-pads");
1096       gst_element_no_more_pads ((GstElement *) tsdemux);
1097     }
1098   } else
1099     GST_WARNING_OBJECT (tsdemux,
1100         "stream %p (pid 0x%04x, type:0x%03x) has no pad", stream,
1101         ((MpegTSBaseStream *) stream)->pid,
1102         ((MpegTSBaseStream *) stream)->stream_type);
1103 }
1104
1105 static void
1106 gst_ts_demux_stream_flush (TSDemuxStream * stream)
1107 {
1108   stream->pts = GST_CLOCK_TIME_NONE;
1109
1110   GST_DEBUG ("flushing stream %p", stream);
1111
1112   if (stream->data)
1113     g_free (stream->data);
1114   stream->data = NULL;
1115   stream->state = PENDING_PACKET_EMPTY;
1116   stream->expected_size = 0;
1117   stream->allocated_size = 0;
1118   stream->current_size = 0;
1119   stream->need_newsegment = TRUE;
1120   stream->pts = GST_CLOCK_TIME_NONE;
1121   stream->dts = GST_CLOCK_TIME_NONE;
1122   stream->raw_pts = 0;
1123   stream->raw_dts = 0;
1124   stream->fixed_pts = 0;
1125   stream->fixed_dts = 0;
1126   stream->nb_pts_rollover = 0;
1127   stream->nb_dts_rollover = 0;
1128   if (stream->flow_return == GST_FLOW_FLUSHING) {
1129     stream->flow_return = GST_FLOW_OK;
1130   }
1131   stream->continuity_counter = CONTINUITY_UNSET;
1132 }
1133
1134 static void
1135 gst_ts_demux_flush_streams (GstTSDemux * demux)
1136 {
1137   g_list_foreach (demux->program->stream_list,
1138       (GFunc) gst_ts_demux_stream_flush, NULL);
1139 }
1140
1141 static void
1142 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
1143 {
1144   GstTSDemux *demux = GST_TS_DEMUX (base);
1145
1146   GST_DEBUG ("Current program %d, new program %d",
1147       demux->program_number, program->program_number);
1148
1149   if (demux->program_number == -1 ||
1150       demux->program_number == program->program_number) {
1151
1152     GST_LOG ("program %d started", program->program_number);
1153     demux->program_number = program->program_number;
1154     demux->program = program;
1155
1156     /* If this is not the initial program, we need to calculate
1157      * an update newsegment */
1158     demux->calculate_update_segment = !program->initial_program;
1159
1160     /* FIXME : When do we emit no_more_pads ? */
1161   }
1162 }
1163
1164 static void
1165 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
1166 {
1167   GstTSDemux *demux = GST_TS_DEMUX (base);
1168
1169   if (demux->program == program) {
1170     demux->program = NULL;
1171     demux->program_number = -1;
1172   }
1173 }
1174
1175
1176 static inline void
1177 gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
1178     guint64 pts, guint64 offset)
1179 {
1180   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1181
1182   if (pts == -1) {
1183     stream->pts = GST_CLOCK_TIME_NONE;
1184     return;
1185   }
1186
1187   GST_LOG ("pid 0x%04x pts:%" G_GUINT64_FORMAT " at offset %"
1188       G_GUINT64_FORMAT, bs->pid, pts, offset);
1189
1190   if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (stream->pts) &&
1191           ABSDIFF (stream->raw_pts, pts) > 900000)) {
1192     /* Detect rollover if diff > 10s */
1193     GST_LOG ("Detected rollover (previous:%" G_GUINT64_FORMAT " new:%"
1194         G_GUINT64_FORMAT ")", stream->raw_pts, pts);
1195     if (pts < stream->raw_pts) {
1196       /* Forward rollover */
1197       GST_LOG ("Forward rollover, incrementing nb_pts_rollover");
1198       stream->nb_pts_rollover++;
1199     } else {
1200       /* Reverse rollover */
1201       GST_LOG ("Reverse rollover, decrementing nb_pts_rollover");
1202       stream->nb_pts_rollover--;
1203     }
1204   }
1205
1206   /* Compute PTS in GstClockTime */
1207   stream->raw_pts = pts;
1208   stream->fixed_pts = pts + stream->nb_pts_rollover * PTS_DTS_MAX_VALUE;
1209   stream->pts = MPEGTIME_TO_GSTTIME (stream->fixed_pts);
1210
1211   GST_LOG ("pid 0x%04x Stored PTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
1212       bs->pid, stream->raw_pts, GST_TIME_ARGS (stream->pts));
1213
1214
1215   if (G_UNLIKELY (demux->emit_statistics)) {
1216     GstStructure *st;
1217     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
1218     gst_structure_id_set (st,
1219         QUARK_PID, G_TYPE_UINT, bs->pid,
1220         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_PTS, G_TYPE_UINT64, pts,
1221         NULL);
1222     gst_element_post_message (GST_ELEMENT_CAST (demux),
1223         gst_message_new_element (GST_OBJECT (demux), st));
1224   }
1225 }
1226
1227 static inline void
1228 gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
1229     guint64 dts, guint64 offset)
1230 {
1231   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1232
1233   if (dts == -1) {
1234     stream->dts = GST_CLOCK_TIME_NONE;
1235     return;
1236   }
1237
1238   GST_LOG ("pid 0x%04x dts:%" G_GUINT64_FORMAT " at offset %"
1239       G_GUINT64_FORMAT, bs->pid, dts, offset);
1240
1241   if (G_UNLIKELY (GST_CLOCK_TIME_IS_VALID (stream->dts) &&
1242           ABSDIFF (stream->raw_dts, dts) > 900000)) {
1243     /* Detect rollover if diff > 10s */
1244     GST_LOG ("Detected rollover (previous:%" G_GUINT64_FORMAT " new:%"
1245         G_GUINT64_FORMAT ")", stream->raw_dts, dts);
1246     if (dts < stream->raw_dts) {
1247       /* Forward rollover */
1248       GST_LOG ("Forward rollover, incrementing nb_dts_rollover");
1249       stream->nb_dts_rollover++;
1250     } else {
1251       /* Reverse rollover */
1252       GST_LOG ("Reverse rollover, decrementing nb_dts_rollover");
1253       stream->nb_dts_rollover--;
1254     }
1255   }
1256
1257   /* Compute DTS in GstClockTime */
1258   stream->raw_dts = dts;
1259   stream->fixed_dts = dts + stream->nb_dts_rollover * PTS_DTS_MAX_VALUE;
1260   stream->dts = MPEGTIME_TO_GSTTIME (stream->fixed_dts);
1261
1262   GST_LOG ("pid 0x%04x Stored DTS %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
1263       bs->pid, stream->raw_dts, GST_TIME_ARGS (stream->dts));
1264
1265   if (G_UNLIKELY (demux->emit_statistics)) {
1266     GstStructure *st;
1267     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
1268     gst_structure_id_set (st,
1269         QUARK_PID, G_TYPE_UINT, bs->pid,
1270         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_DTS, G_TYPE_UINT64, dts,
1271         NULL);
1272     gst_element_post_message (GST_ELEMENT_CAST (demux),
1273         gst_message_new_element (GST_OBJECT (demux), st));
1274   }
1275 }
1276
1277 static void
1278 gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
1279     guint8 * data, guint32 length, guint64 bufferoffset)
1280 {
1281   MpegTSBase *base = (MpegTSBase *) demux;
1282   PESHeader header;
1283   gint offset = 0;
1284   PESParsingResult parseres;
1285
1286   GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
1287
1288   parseres = mpegts_parse_pes_header (data, length, &header, &offset);
1289   if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE))
1290     goto discont;
1291   if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
1292     GST_WARNING ("Error parsing PES header. pid: 0x%x stream_type: 0x%x",
1293         stream->stream.pid, stream->stream.stream_type);
1294     goto discont;
1295   }
1296
1297   gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset);
1298   gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset);
1299
1300   GST_DEBUG_OBJECT (base,
1301       "stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT,
1302       GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (stream->dts));
1303
1304   /* Remove PES headers */
1305   GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
1306       header.header_size, header.packet_length, length);
1307   stream->expected_size = header.packet_length;
1308   if (stream->expected_size) {
1309     if (G_LIKELY (stream->expected_size > header.header_size)) {
1310       stream->expected_size -= header.header_size;
1311     } else {
1312       /* next packet will have to complete this one */
1313       GST_ERROR ("invalid header and packet size combination");
1314       stream->expected_size = 0;
1315     }
1316   }
1317   data += header.header_size;
1318   length -= header.header_size;
1319
1320   /* Create the output buffer */
1321   if (stream->expected_size)
1322     stream->allocated_size = stream->expected_size;
1323   else
1324     stream->allocated_size = 8192;
1325   g_assert (stream->data == NULL);
1326   stream->data = g_malloc (stream->allocated_size);
1327   memcpy (stream->data, data, length);
1328   stream->current_size = length;
1329
1330   stream->state = PENDING_PACKET_BUFFER;
1331
1332   return;
1333
1334 discont:
1335   stream->state = PENDING_PACKET_DISCONT;
1336   return;
1337 }
1338
1339  /* ONLY CALL THIS:
1340   * * WITH packet->payload != NULL
1341   * * WITH pending/current flushed out if beginning of new PES packet
1342   */
1343 static inline void
1344 gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
1345     MpegTSPacketizerPacket * packet)
1346 {
1347   guint8 *data;
1348   guint size;
1349
1350   GST_DEBUG ("pid: 0x%04x state:%d", stream->stream.pid, stream->state);
1351
1352   size = packet->data_end - packet->payload;
1353   data = packet->payload;
1354
1355   if (stream->continuity_counter == CONTINUITY_UNSET) {
1356     GST_DEBUG ("CONTINUITY: Initialize to %d", packet->continuity_counter);
1357   } else if ((packet->continuity_counter == stream->continuity_counter + 1 ||
1358           (stream->continuity_counter == MAX_CONTINUITY &&
1359               packet->continuity_counter == 0))) {
1360     GST_LOG ("CONTINUITY: Got expected %d", packet->continuity_counter);
1361   } else {
1362     GST_ERROR ("CONTINUITY: Mismatch packet %d, stream %d",
1363         packet->continuity_counter, stream->continuity_counter);
1364     stream->state = PENDING_PACKET_DISCONT;
1365   }
1366   stream->continuity_counter = packet->continuity_counter;
1367
1368   if (stream->state == PENDING_PACKET_EMPTY) {
1369     if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
1370       stream->state = PENDING_PACKET_DISCONT;
1371       GST_WARNING ("Didn't get the first packet of this PES");
1372     } else {
1373       GST_LOG ("EMPTY=>HEADER");
1374       stream->state = PENDING_PACKET_HEADER;
1375     }
1376   }
1377
1378   switch (stream->state) {
1379     case PENDING_PACKET_HEADER:
1380     {
1381       GST_LOG ("HEADER: Parsing PES header");
1382
1383       /* parse the header */
1384       gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset);
1385       break;
1386     }
1387     case PENDING_PACKET_BUFFER:
1388     {
1389       GST_LOG ("BUFFER: appending data");
1390       if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) {
1391         GST_LOG ("resizing buffer");
1392         stream->allocated_size = stream->allocated_size * 2;
1393         stream->data = g_realloc (stream->data, stream->allocated_size);
1394       }
1395       memcpy (stream->data + stream->current_size, data, size);
1396       stream->current_size += size;
1397       break;
1398     }
1399     case PENDING_PACKET_DISCONT:
1400     {
1401       GST_LOG ("DISCONT: not storing/pushing");
1402       if (G_UNLIKELY (stream->data)) {
1403         g_free (stream->data);
1404         stream->data = NULL;
1405       }
1406       stream->continuity_counter = CONTINUITY_UNSET;
1407       break;
1408     }
1409     default:
1410       break;
1411   }
1412
1413   return;
1414 }
1415
1416 static void
1417 calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
1418 {
1419   MpegTSBase *base = (MpegTSBase *) demux;
1420   GstClockTime lowest_pts = GST_CLOCK_TIME_NONE;
1421   GstClockTime firstts = 0;
1422   GList *tmp;
1423
1424   GST_DEBUG ("Creating new newsegment for stream %p", stream);
1425
1426   /* 0) If we don't have a time segment yet try to recover segment info from
1427    *    base when it's in time otherwise just initialize segment with
1428    *    defaults.
1429    *    It will happen only if it's first program or after flushes. */
1430   if (demux->segment.format == GST_FORMAT_UNDEFINED) {
1431     if (base->segment.format == GST_FORMAT_TIME) {
1432       demux->segment = base->segment;
1433       /* We can shortcut and create the segment event directly */
1434       demux->segment_event = gst_event_new_segment (&demux->segment);
1435     } else {
1436       gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1437     }
1438   }
1439
1440   /* 1) If we need to calculate an update newsegment, do it
1441    * 2) If we need to calculate a new newsegment, do it
1442    * 3) If an update_segment is valid, push it
1443    * 4) If a newsegment is valid, push it */
1444
1445   /* Speedup : if we don't need to calculate anything, go straight to pushing */
1446   if (!demux->calculate_update_segment && demux->segment_event)
1447     goto push_new_segment;
1448
1449   /* Calculate the 'new_start' value, used for both updates and newsegment */
1450   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1451     TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
1452
1453     if (GST_CLOCK_TIME_IS_VALID (pstream->pts)) {
1454       if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->pts < lowest_pts)
1455         lowest_pts = pstream->pts;
1456     }
1457     if (GST_CLOCK_TIME_IS_VALID (pstream->dts)) {
1458       if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->dts < lowest_pts)
1459         lowest_pts = pstream->dts;
1460     }
1461   }
1462   if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
1463     firstts =
1464         mpegts_packetizer_pts_to_ts (base->packetizer, lowest_pts,
1465         demux->program->pcr_pid);
1466   GST_DEBUG ("lowest_pts %" G_GUINT64_FORMAT " => clocktime %" GST_TIME_FORMAT,
1467       lowest_pts, GST_TIME_ARGS (firstts));
1468
1469   if (demux->calculate_update_segment) {
1470     GST_DEBUG ("Calculating update segment");
1471     /* If we have a valid segment, create an update of that */
1472     if (demux->segment.format == GST_FORMAT_TIME) {
1473       GstSegment update_segment;
1474       GST_DEBUG ("Re-using segment " SEGMENT_FORMAT,
1475           SEGMENT_ARGS (demux->segment));
1476       gst_segment_copy_into (&demux->segment, &update_segment);
1477       update_segment.stop = firstts;
1478       demux->update_segment = gst_event_new_segment (&update_segment);
1479     }
1480     demux->calculate_update_segment = FALSE;
1481   }
1482
1483   if (!demux->segment_event) {
1484     GstSegment new_segment;
1485
1486     GST_DEBUG ("Calculating actual segment");
1487
1488     gst_segment_copy_into (&demux->segment, &new_segment);
1489     if (new_segment.format != GST_FORMAT_TIME) {
1490       /* Start from the first ts/pts */
1491       new_segment.start = firstts;
1492       new_segment.stop = GST_CLOCK_TIME_NONE;
1493       new_segment.position = firstts;
1494     }
1495
1496     demux->segment_event = gst_event_new_segment (&new_segment);
1497   }
1498
1499 push_new_segment:
1500   if (demux->update_segment) {
1501     GST_DEBUG_OBJECT (stream->pad, "Pushing update segment");
1502     gst_event_ref (demux->update_segment);
1503     gst_pad_push_event (stream->pad, demux->update_segment);
1504   }
1505
1506   if (demux->segment_event) {
1507     GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
1508     gst_event_ref (demux->segment_event);
1509     gst_pad_push_event (stream->pad, demux->segment_event);
1510   }
1511
1512   /* Push pending tags */
1513   if (stream->taglist) {
1514     GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT,
1515         stream->taglist);
1516     gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist));
1517     stream->taglist = NULL;
1518   }
1519
1520   stream->need_newsegment = FALSE;
1521 }
1522
1523 static GstFlowReturn
1524 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
1525 {
1526   GstFlowReturn res = GST_FLOW_OK;
1527   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1528   GstBuffer *buffer = NULL;
1529   MpegTSPacketizer2 *packetizer = MPEG_TS_BASE_PACKETIZER (demux);
1530
1531   GST_DEBUG_OBJECT (stream->pad,
1532       "stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid,
1533       bs->stream_type, stream->state);
1534
1535   if (G_UNLIKELY (stream->data == NULL)) {
1536     GST_LOG ("stream->data == NULL");
1537     goto beach;
1538   }
1539
1540   if (G_UNLIKELY (stream->state == PENDING_PACKET_EMPTY)) {
1541     GST_LOG ("EMPTY: returning");
1542     goto beach;
1543   }
1544
1545   if (G_UNLIKELY (stream->state != PENDING_PACKET_BUFFER)) {
1546     GST_LOG ("state:%d, returning", stream->state);
1547     goto beach;
1548   }
1549
1550   if (G_UNLIKELY (!stream->active))
1551     activate_pad_for_stream (demux, stream);
1552
1553   if (G_UNLIKELY (stream->pad == NULL)) {
1554     g_free (stream->data);
1555     goto beach;
1556   }
1557
1558   if (G_UNLIKELY (demux->program == NULL)) {
1559     GST_LOG_OBJECT (demux, "No program");
1560     g_free (stream->data);
1561     goto beach;
1562   }
1563
1564   if (G_UNLIKELY (stream->need_newsegment))
1565     calculate_and_push_newsegment (demux, stream);
1566
1567   buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
1568
1569   GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
1570       GST_TIME_ARGS (stream->pts));
1571   if (GST_CLOCK_TIME_IS_VALID (stream->pts))
1572     GST_BUFFER_PTS (buffer) =
1573         mpegts_packetizer_pts_to_ts (packetizer, stream->pts,
1574         demux->program->pcr_pid);
1575   if (GST_CLOCK_TIME_IS_VALID (stream->dts))
1576     GST_BUFFER_DTS (buffer) =
1577         mpegts_packetizer_pts_to_ts (packetizer, stream->dts,
1578         demux->program->pcr_pid);
1579
1580   GST_DEBUG_OBJECT (stream->pad,
1581       "Pushing buffer with PTS: %" GST_TIME_FORMAT " , DTS: %" GST_TIME_FORMAT,
1582       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
1583       GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
1584
1585   res = gst_pad_push (stream->pad, buffer);
1586   GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
1587   res = tsdemux_combine_flows (demux, stream, res);
1588   GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
1589
1590 beach:
1591   /* Reset everything */
1592   GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
1593   stream->state = PENDING_PACKET_EMPTY;
1594   stream->data = NULL;
1595   stream->expected_size = 0;
1596   stream->current_size = 0;
1597
1598   return res;
1599 }
1600
1601 static GstFlowReturn
1602 gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
1603     MpegTSPacketizerPacket * packet, MpegTSPacketizerSection * section)
1604 {
1605   GstFlowReturn res = GST_FLOW_OK;
1606
1607   GST_DEBUG ("data:%p", packet->data);
1608   GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", packet->pid,
1609       packet->payload_unit_start_indicator, packet->adaptation_field_control,
1610       packet->continuity_counter, packet->payload);
1611
1612   if (section) {
1613     GST_DEBUG ("section complete:%d, buffer size %d",
1614         section->complete, section->section_length);
1615     return res;
1616   }
1617
1618   if (G_UNLIKELY (packet->payload_unit_start_indicator) &&
1619       packet->adaptation_field_control & 0x1)
1620     /* Flush previous data */
1621     res = gst_ts_demux_push_pending_data (demux, stream);
1622
1623   if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)
1624       && stream->pad) {
1625     gst_ts_demux_queue_data (demux, stream, packet);
1626     GST_DEBUG ("current_size:%d, expected_size:%d",
1627         stream->current_size, stream->expected_size);
1628     /* Finally check if the data we queued completes a packet */
1629     if (stream->expected_size && stream->current_size == stream->expected_size) {
1630       GST_LOG ("pushing complete packet");
1631       res = gst_ts_demux_push_pending_data (demux, stream);
1632     }
1633   }
1634
1635   return res;
1636 }
1637
1638 static void
1639 gst_ts_demux_flush (MpegTSBase * base)
1640 {
1641   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
1642
1643   gst_ts_demux_flush_streams (demux);
1644
1645   if (demux->segment_event) {
1646     gst_event_unref (demux->segment_event);
1647     demux->segment_event = NULL;
1648   }
1649   demux->calculate_update_segment = FALSE;
1650   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
1651 }
1652
1653 static GstFlowReturn
1654 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
1655     MpegTSPacketizerSection * section)
1656 {
1657   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
1658   TSDemuxStream *stream = NULL;
1659   GstFlowReturn res = GST_FLOW_OK;
1660
1661   if (G_LIKELY (demux->program)) {
1662     stream = (TSDemuxStream *) demux->program->streams[packet->pid];
1663
1664     if (stream) {
1665       res = gst_ts_demux_handle_packet (demux, stream, packet, section);
1666     }
1667   }
1668   return res;
1669 }
1670
1671 gboolean
1672 gst_ts_demux_plugin_init (GstPlugin * plugin)
1673 {
1674   GST_DEBUG_CATEGORY_INIT (ts_demux_debug, "tsdemux", 0,
1675       "MPEG transport stream demuxer");
1676   init_pes_parser ();
1677
1678   return gst_element_register (plugin, "tsdemux",
1679       GST_RANK_PRIMARY, GST_TYPE_TS_DEMUX);
1680 }