390d080a34e1d3756bb618dfa5b1ac88178448a9
[platform/upstream/gstreamer.git] / gst / mpegtsdemux / tsdemux.c
1 /*
2  * tsdemux.c
3  * Copyright (C) 2009 Zaheer Abbas Merali
4  *               2010 Edward Hervey
5  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
6  *  Author: Youness Alaoui <youness.alaoui@collabora.co.uk>, Collabora Ltd.
7  *  Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
8  *  Author: Edward Hervey <bilboed@bilboed.com>, Collabora Ltd.
9  *
10  * Authors:
11  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
12  *   Edward Hervey <edward.hervey@collabora.co.uk>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  */
29
30 #ifdef HAVE_CONFIG_H
31 #include "config.h"
32 #endif
33
34 #include <stdlib.h>
35 #include <string.h>
36
37 #include <glib.h>
38 #include <gst/tag/tag.h>
39 #include <gst/pbutils/pbutils.h>
40
41 #include "mpegtsbase.h"
42 #include "tsdemux.h"
43 #include "gstmpegdesc.h"
44 #include "gstmpegdefs.h"
45 #include "mpegtspacketizer.h"
46 #include "pesparse.h"
47
48 /*
49  * tsdemux
50  *
51  * See TODO for explanations on improvements needed
52  */
53
54 #define CONTINUITY_UNSET 255
55 #define MAX_CONTINUITY 15
56
57 /* Seeking/Scanning related variables */
58
59 /* seek to SEEK_TIMESTAMP_OFFSET before the desired offset and search then
60  * either accurately or for the next timestamp
61  */
62 #define SEEK_TIMESTAMP_OFFSET (500 * GST_MSECOND)
63
64 #define SEGMENT_FORMAT "[format:%s, rate:%f, start:%"                   \
65   GST_TIME_FORMAT", stop:%"GST_TIME_FORMAT", time:%"GST_TIME_FORMAT     \
66   ", base:%"GST_TIME_FORMAT", position:%"GST_TIME_FORMAT                \
67   ", duration:%"GST_TIME_FORMAT"]"
68
69 #define SEGMENT_ARGS(a) gst_format_get_name((a).format), (a).rate,      \
70     GST_TIME_ARGS((a).start), GST_TIME_ARGS((a).stop),                  \
71     GST_TIME_ARGS((a).time), GST_TIME_ARGS((a).base),                   \
72     GST_TIME_ARGS((a).position), GST_TIME_ARGS((a).duration)
73
74
75 GST_DEBUG_CATEGORY_STATIC (ts_demux_debug);
76 #define GST_CAT_DEFAULT ts_demux_debug
77
78 #define ABSDIFF(a,b) (((a) > (b)) ? ((a) - (b)) : ((b) - (a)))
79
80 static GQuark QUARK_TSDEMUX;
81 static GQuark QUARK_PID;
82 static GQuark QUARK_PCR;
83 static GQuark QUARK_OPCR;
84 static GQuark QUARK_PTS;
85 static GQuark QUARK_DTS;
86 static GQuark QUARK_OFFSET;
87
88 typedef enum
89 {
90   PENDING_PACKET_EMPTY = 0,     /* No pending packet/buffer
91                                  * Push incoming buffers to the array */
92   PENDING_PACKET_HEADER,        /* PES header needs to be parsed
93                                  * Push incoming buffers to the array */
94   PENDING_PACKET_BUFFER,        /* Currently filling up output buffer
95                                  * Push incoming buffers to the bufferlist */
96   PENDING_PACKET_DISCONT        /* Discontinuity in incoming packets
97                                  * Drop all incoming buffers */
98 } PendingPacketState;
99
100 /* Pending buffer */
101 typedef struct
102 {
103   /* The fully reconstructed buffer */
104   GstBuffer *buffer;
105
106   /* Raw PTS/DTS (in 90kHz units) */
107   guint64 pts, dts;
108 } PendingBuffer;
109
110 typedef struct _TSDemuxStream TSDemuxStream;
111
112 struct _TSDemuxStream
113 {
114   MpegTSBaseStream stream;
115
116   GstPad *pad;
117
118   /* Whether the pad was added or not */
119   gboolean active;
120
121   /* TRUE if we are waiting for a valid timestamp */
122   gboolean pending_ts;
123
124   /* the return of the latest push */
125   GstFlowReturn flow_return;
126
127   /* Output data */
128   PendingPacketState state;
129
130   /* Data being reconstructed (allocated) */
131   guint8 *data;
132
133   /* Size of data being reconstructed (if known, else 0) */
134   guint expected_size;
135
136   /* Amount of bytes in current ->data */
137   guint current_size;
138   /* Size of ->data */
139   guint allocated_size;
140
141   /* Current PTS/DTS for this stream (in running time) */
142   GstClockTime pts;
143   GstClockTime dts;
144
145   /* Current PTS/DTS for this stream (in 90kHz unit) */
146   guint64 raw_pts, raw_dts;
147
148   /* Whether this stream needs to send a newsegment */
149   gboolean need_newsegment;
150
151   /* The value to use when calculating the newsegment */
152   GstClockTime first_dts;
153
154   GstTagList *taglist;
155
156   gint continuity_counter;
157
158   /* List of pending buffers */
159   GList *pending;
160 };
161
162 #define VIDEO_CAPS \
163   GST_STATIC_CAPS (\
164     "video/mpeg, " \
165       "mpegversion = (int) { 1, 2, 4 }, " \
166       "systemstream = (boolean) FALSE; " \
167     "video/x-h264,stream-format=(string)byte-stream," \
168       "alignment=(string)nal;" \
169     "video/x-dirac;" \
170     "video/x-wmv," \
171       "wmvversion = (int) 3, " \
172       "format = (string) WVC1" \
173   )
174
175 #define AUDIO_CAPS \
176   GST_STATIC_CAPS ( \
177     "audio/mpeg, " \
178       "mpegversion = (int) 1;" \
179     "audio/mpeg, " \
180       "mpegversion = (int) 2, " \
181       "stream-format = (string) adts; " \
182     "audio/mpeg, " \
183       "mpegversion = (int) 4, " \
184       "stream-format = (string) loas; " \
185     "audio/x-lpcm, " \
186       "width = (int) { 16, 20, 24 }, " \
187       "rate = (int) { 48000, 96000 }, " \
188       "channels = (int) [ 1, 8 ], " \
189       "dynamic_range = (int) [ 0, 255 ], " \
190       "emphasis = (boolean) { FALSE, TRUE }, " \
191       "mute = (boolean) { FALSE, TRUE }; " \
192     "audio/x-ac3; audio/x-eac3;" \
193     "audio/x-dts;" \
194     "audio/x-private-ts-lpcm" \
195   )
196
197 /* Can also use the subpicture pads for text subtitles? */
198 #define SUBPICTURE_CAPS \
199     GST_STATIC_CAPS ("subpicture/x-pgs; subpicture/x-dvd")
200
201 static GstStaticPadTemplate video_template =
202 GST_STATIC_PAD_TEMPLATE ("video_%04x", GST_PAD_SRC,
203     GST_PAD_SOMETIMES,
204     VIDEO_CAPS);
205
206 static GstStaticPadTemplate audio_template =
207 GST_STATIC_PAD_TEMPLATE ("audio_%04x",
208     GST_PAD_SRC,
209     GST_PAD_SOMETIMES,
210     AUDIO_CAPS);
211
212 static GstStaticPadTemplate subpicture_template =
213 GST_STATIC_PAD_TEMPLATE ("subpicture_%04x",
214     GST_PAD_SRC,
215     GST_PAD_SOMETIMES,
216     SUBPICTURE_CAPS);
217
218 static GstStaticPadTemplate private_template =
219 GST_STATIC_PAD_TEMPLATE ("private_%04x",
220     GST_PAD_SRC,
221     GST_PAD_SOMETIMES,
222     GST_STATIC_CAPS_ANY);
223
224 enum
225 {
226   ARG_0,
227   PROP_PROGRAM_NUMBER,
228   PROP_EMIT_STATS,
229   /* FILL ME */
230 };
231
232 /* Pad functions */
233
234
235 /* mpegtsbase methods */
236 static void
237 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
238 static void
239 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
240 static void gst_ts_demux_reset (MpegTSBase * base);
241 static GstFlowReturn
242 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
243     GstMpegTsSection * section);
244 static void gst_ts_demux_flush (MpegTSBase * base, gboolean hard);
245 static void
246 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * stream,
247     MpegTSBaseProgram * program);
248 static void
249 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * stream);
250 static GstFlowReturn gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event);
251 static void gst_ts_demux_set_property (GObject * object, guint prop_id,
252     const GValue * value, GParamSpec * pspec);
253 static void gst_ts_demux_get_property (GObject * object, guint prop_id,
254     GValue * value, GParamSpec * pspec);
255 static void gst_ts_demux_flush_streams (GstTSDemux * tsdemux);
256 static GstFlowReturn
257 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream);
258 static void gst_ts_demux_stream_flush (TSDemuxStream * stream);
259
260 static gboolean push_event (MpegTSBase * base, GstEvent * event);
261
262 static void
263 _extra_init (void)
264 {
265   QUARK_TSDEMUX = g_quark_from_string ("tsdemux");
266   QUARK_PID = g_quark_from_string ("pid");
267   QUARK_PCR = g_quark_from_string ("pcr");
268   QUARK_OPCR = g_quark_from_string ("opcr");
269   QUARK_PTS = g_quark_from_string ("pts");
270   QUARK_DTS = g_quark_from_string ("dts");
271   QUARK_OFFSET = g_quark_from_string ("offset");
272 }
273
274 #define gst_ts_demux_parent_class parent_class
275 G_DEFINE_TYPE_WITH_CODE (GstTSDemux, gst_ts_demux, GST_TYPE_MPEGTS_BASE,
276     _extra_init ());
277
278 static void
279 gst_ts_demux_class_init (GstTSDemuxClass * klass)
280 {
281   GObjectClass *gobject_class;
282   GstElementClass *element_class;
283   MpegTSBaseClass *ts_class;
284
285   gobject_class = G_OBJECT_CLASS (klass);
286   gobject_class->set_property = gst_ts_demux_set_property;
287   gobject_class->get_property = gst_ts_demux_get_property;
288
289   g_object_class_install_property (gobject_class, PROP_PROGRAM_NUMBER,
290       g_param_spec_int ("program-number", "Program number",
291           "Program Number to demux for (-1 to ignore)", -1, G_MAXINT,
292           -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
293
294   g_object_class_install_property (gobject_class, PROP_EMIT_STATS,
295       g_param_spec_boolean ("emit-stats", "Emit statistics",
296           "Emit messages for every pcr/opcr/pts/dts", FALSE,
297           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298
299   element_class = GST_ELEMENT_CLASS (klass);
300   gst_element_class_add_pad_template (element_class,
301       gst_static_pad_template_get (&video_template));
302   gst_element_class_add_pad_template (element_class,
303       gst_static_pad_template_get (&audio_template));
304   gst_element_class_add_pad_template (element_class,
305       gst_static_pad_template_get (&subpicture_template));
306   gst_element_class_add_pad_template (element_class,
307       gst_static_pad_template_get (&private_template));
308
309   gst_element_class_set_static_metadata (element_class,
310       "MPEG transport stream demuxer",
311       "Codec/Demuxer",
312       "Demuxes MPEG2 transport streams",
313       "Zaheer Abbas Merali <zaheerabbas at merali dot org>\n"
314       "Edward Hervey <edward.hervey@collabora.co.uk>");
315
316   ts_class = GST_MPEGTS_BASE_CLASS (klass);
317   ts_class->reset = GST_DEBUG_FUNCPTR (gst_ts_demux_reset);
318   ts_class->push = GST_DEBUG_FUNCPTR (gst_ts_demux_push);
319   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
320   ts_class->program_started = GST_DEBUG_FUNCPTR (gst_ts_demux_program_started);
321   ts_class->program_stopped = GST_DEBUG_FUNCPTR (gst_ts_demux_program_stopped);
322   ts_class->stream_added = gst_ts_demux_stream_added;
323   ts_class->stream_removed = gst_ts_demux_stream_removed;
324   ts_class->seek = GST_DEBUG_FUNCPTR (gst_ts_demux_do_seek);
325   ts_class->flush = GST_DEBUG_FUNCPTR (gst_ts_demux_flush);
326 }
327
328 static void
329 gst_ts_demux_reset (MpegTSBase * base)
330 {
331   GstTSDemux *demux = (GstTSDemux *) base;
332
333   demux->calculate_update_segment = FALSE;
334
335   demux->rate = 1.0;
336   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
337   if (demux->segment_event) {
338     gst_event_unref (demux->segment_event);
339     demux->segment_event = NULL;
340   }
341
342   if (demux->update_segment) {
343     gst_event_unref (demux->update_segment);
344     demux->update_segment = NULL;
345   }
346
347   demux->have_group_id = FALSE;
348   demux->group_id = G_MAXUINT;
349 }
350
351 static void
352 gst_ts_demux_init (GstTSDemux * demux)
353 {
354   MpegTSBase *base = (MpegTSBase *) demux;
355
356   base->stream_size = sizeof (TSDemuxStream);
357   base->parse_private_sections = TRUE;
358   /* We are not interested in sections (all handled by mpegtsbase) */
359   base->push_section = FALSE;
360
361   demux->requested_program_number = -1;
362   demux->program_number = -1;
363   gst_ts_demux_reset (base);
364 }
365
366
367 static void
368 gst_ts_demux_set_property (GObject * object, guint prop_id,
369     const GValue * value, GParamSpec * pspec)
370 {
371   GstTSDemux *demux = GST_TS_DEMUX (object);
372
373   switch (prop_id) {
374     case PROP_PROGRAM_NUMBER:
375       /* FIXME: do something if program is switched as opposed to set at
376        * beginning */
377       demux->requested_program_number = g_value_get_int (value);
378       break;
379     case PROP_EMIT_STATS:
380       demux->emit_statistics = g_value_get_boolean (value);
381       break;
382     default:
383       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
384   }
385 }
386
387 static void
388 gst_ts_demux_get_property (GObject * object, guint prop_id,
389     GValue * value, GParamSpec * pspec)
390 {
391   GstTSDemux *demux = GST_TS_DEMUX (object);
392
393   switch (prop_id) {
394     case PROP_PROGRAM_NUMBER:
395       g_value_set_int (value, demux->requested_program_number);
396       break;
397     case PROP_EMIT_STATS:
398       g_value_set_boolean (value, demux->emit_statistics);
399       break;
400     default:
401       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
402   }
403 }
404
405 static gboolean
406 gst_ts_demux_srcpad_query (GstPad * pad, GstObject * parent, GstQuery * query)
407 {
408   gboolean res = TRUE;
409   GstFormat format;
410   GstTSDemux *demux;
411   MpegTSBase *base;
412
413   demux = GST_TS_DEMUX (parent);
414   base = GST_MPEGTS_BASE (demux);
415
416   switch (GST_QUERY_TYPE (query)) {
417     case GST_QUERY_DURATION:
418     {
419       GST_DEBUG ("query duration");
420       gst_query_parse_duration (query, &format, NULL);
421       if (format == GST_FORMAT_TIME) {
422         if (!gst_pad_peer_query (base->sinkpad, query)) {
423           gint64 val;
424
425           format = GST_FORMAT_BYTES;
426           if (!gst_pad_peer_query_duration (base->sinkpad, format, &val))
427             res = FALSE;
428           else {
429             GstClockTime dur =
430                 mpegts_packetizer_offset_to_ts (base->packetizer, val,
431                 demux->program->pcr_pid);
432             if (GST_CLOCK_TIME_IS_VALID (dur))
433               gst_query_set_duration (query, GST_FORMAT_TIME, dur);
434             else
435               res = FALSE;
436           }
437         }
438       } else {
439         GST_DEBUG_OBJECT (demux, "only query duration on TIME is supported");
440         res = FALSE;
441       }
442       break;
443     }
444     case GST_QUERY_LATENCY:
445     {
446       GST_DEBUG ("query latency");
447       res = gst_pad_peer_query (base->sinkpad, query);
448       if (res && base->upstream_live) {
449         GstClockTime min_lat, max_lat;
450         gboolean live;
451
452         /* According to H.222.0
453            Annex D.0.3 (System Time Clock recovery in the decoder)
454            and D.0.2 (Audio and video presentation synchronization)
455
456            We can end up with an interval of up to 700ms between valid
457            PCR/SCR. We therefore allow a latency of 700ms for that.
458          */
459         gst_query_parse_latency (query, &live, &min_lat, &max_lat);
460         if (min_lat != -1)
461           min_lat += 700 * GST_MSECOND;
462         if (max_lat != -1)
463           max_lat += 700 * GST_MSECOND;
464         gst_query_set_latency (query, live, min_lat, max_lat);
465       }
466       break;
467     }
468     case GST_QUERY_SEEKING:
469     {
470       GST_DEBUG ("query seeking");
471       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
472       if (format == GST_FORMAT_TIME) {
473         gboolean seekable = FALSE;
474
475         if (gst_pad_peer_query (base->sinkpad, query))
476           gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
477
478         /* If upstream is not seekable in TIME format we use
479          * our own values here */
480         if (!seekable)
481           gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0,
482               demux->segment.duration);
483       } else {
484         GST_DEBUG_OBJECT (demux, "only TIME is supported for query seeking");
485         res = FALSE;
486       }
487       break;
488     }
489     case GST_QUERY_SEGMENT:{
490       GstFormat format;
491       gint64 start, stop;
492
493       format = demux->segment.format;
494
495       start =
496           gst_segment_to_stream_time (&demux->segment, format,
497           demux->segment.start);
498       if ((stop = demux->segment.stop) == -1)
499         stop = demux->segment.duration;
500       else
501         stop = gst_segment_to_stream_time (&demux->segment, format, stop);
502
503       gst_query_set_segment (query, demux->segment.rate, format, start, stop);
504       res = TRUE;
505       break;
506     }
507     default:
508       res = gst_pad_query_default (pad, parent, query);
509   }
510
511   return res;
512
513 }
514
515 static GstFlowReturn
516 gst_ts_demux_do_seek (MpegTSBase * base, GstEvent * event)
517 {
518   GstTSDemux *demux = (GstTSDemux *) base;
519   GstFlowReturn res = GST_FLOW_ERROR;
520   gdouble rate;
521   GstFormat format;
522   GstSeekFlags flags;
523   GstSeekType start_type, stop_type;
524   gint64 start, stop;
525   GstSegment seeksegment;
526   gboolean update;
527   guint64 start_offset;
528
529   gst_event_parse_seek (event, &rate, &format, &flags, &start_type, &start,
530       &stop_type, &stop);
531
532   GST_DEBUG ("seek event, rate: %f start: %" GST_TIME_FORMAT
533       " stop: %" GST_TIME_FORMAT, rate, GST_TIME_ARGS (start),
534       GST_TIME_ARGS (stop));
535
536   if (rate <= 0.0) {
537     GST_WARNING ("Negative rate not supported");
538     goto done;
539   }
540
541   if (flags & (GST_SEEK_FLAG_SEGMENT)) {
542     GST_WARNING ("seek flags 0x%x are not supported", (int) flags);
543     goto done;
544   }
545
546   /* copy segment, we need this because we still need the old
547    * segment when we close the current segment. */
548   memcpy (&seeksegment, &demux->segment, sizeof (GstSegment));
549
550   /* configure the segment with the seek variables */
551   GST_DEBUG_OBJECT (demux, "configuring seek");
552   GST_DEBUG ("seeksegment before set_seek " SEGMENT_FORMAT,
553       SEGMENT_ARGS (seeksegment));
554
555   gst_segment_do_seek (&seeksegment, rate, format, flags, start_type, start,
556       stop_type, stop, &update);
557
558   GST_DEBUG ("seeksegment after set_seek " SEGMENT_FORMAT,
559       SEGMENT_ARGS (seeksegment));
560
561   /* Convert start/stop to offset */
562   start_offset =
563       mpegts_packetizer_ts_to_offset (base->packetizer, MAX (0,
564           start - SEEK_TIMESTAMP_OFFSET), demux->program->pcr_pid);
565
566   if (G_UNLIKELY (start_offset == -1)) {
567     GST_WARNING ("Couldn't convert start position to an offset");
568     goto done;
569   }
570
571   /* record offset and rate */
572   base->seek_offset = start_offset;
573   demux->rate = rate;
574   res = GST_FLOW_OK;
575
576   /* Drop segment info, it needs to be recreated after the actual seek */
577   gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
578   if (demux->segment_event) {
579     gst_event_unref (demux->segment_event);
580     demux->segment_event = NULL;
581   }
582
583 done:
584   return res;
585 }
586
587 static gboolean
588 gst_ts_demux_srcpad_event (GstPad * pad, GstObject * parent, GstEvent * event)
589 {
590   gboolean res = TRUE;
591   GstTSDemux *demux = GST_TS_DEMUX (parent);
592
593   GST_DEBUG_OBJECT (pad, "Got event %s",
594       gst_event_type_get_name (GST_EVENT_TYPE (event)));
595
596   switch (GST_EVENT_TYPE (event)) {
597     case GST_EVENT_SEEK:
598       res = mpegts_base_handle_seek_event ((MpegTSBase *) demux, pad, event);
599       if (!res)
600         GST_WARNING ("seeking failed");
601       gst_event_unref (event);
602       break;
603     default:
604       res = gst_pad_event_default (pad, parent, event);
605   }
606
607   return res;
608 }
609
610 static gboolean
611 push_event (MpegTSBase * base, GstEvent * event)
612 {
613   GstTSDemux *demux = (GstTSDemux *) base;
614   GList *tmp;
615
616   if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
617     GST_DEBUG_OBJECT (base, "Ignoring segment event (recreated later)");
618     gst_event_unref (event);
619     return TRUE;
620   }
621
622   if (G_UNLIKELY (demux->program == NULL)) {
623     gst_event_unref (event);
624     return FALSE;
625   }
626
627   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
628     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
629     if (stream->pad) {
630       /* If we are pushing out EOS, flush out pending data first */
631       if (GST_EVENT_TYPE (event) == GST_EVENT_EOS && stream->active &&
632           gst_pad_is_active (stream->pad))
633         gst_ts_demux_push_pending_data (demux, stream);
634
635       gst_event_ref (event);
636       gst_pad_push_event (stream->pad, event);
637     }
638   }
639
640   gst_event_unref (event);
641
642   return TRUE;
643 }
644
645 static GstFlowReturn
646 tsdemux_combine_flows (GstTSDemux * demux, TSDemuxStream * stream,
647     GstFlowReturn ret)
648 {
649   GList *tmp;
650
651   /* Store the value */
652   stream->flow_return = ret;
653
654   /* any other error that is not-linked can be returned right away */
655   if (ret != GST_FLOW_NOT_LINKED)
656     goto done;
657
658   /* Only return NOT_LINKED if all other pads returned NOT_LINKED */
659   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
660     stream = (TSDemuxStream *) tmp->data;
661     if (stream->pad) {
662       ret = stream->flow_return;
663       /* some other return value (must be SUCCESS but we can return
664        * other values as well) */
665       if (ret != GST_FLOW_NOT_LINKED)
666         goto done;
667     }
668     /* if we get here, all other pads were unlinked and we return
669      * NOT_LINKED then */
670   }
671
672 done:
673   return ret;
674 }
675
676 static inline void
677 add_iso639_language_to_tags (TSDemuxStream * stream, gchar * lang_code)
678 {
679   const gchar *lc;
680
681   GST_LOG ("Add language code for stream: '%s'", lang_code);
682
683   if (!stream->taglist)
684     stream->taglist = gst_tag_list_new_empty ();
685
686   /* descriptor contains ISO 639-2 code, we want the ISO 639-1 code */
687   lc = gst_tag_get_language_code (lang_code);
688
689   /* Only set tag if we have a valid one */
690   if (lc || (lang_code[0] && lang_code[1]))
691     gst_tag_list_add (stream->taglist, GST_TAG_MERGE_REPLACE,
692         GST_TAG_LANGUAGE_CODE, (lc) ? lc : lang_code, NULL);
693 }
694
695 static void
696 gst_ts_demux_create_tags (TSDemuxStream * stream)
697 {
698   MpegTSBaseStream *bstream = (MpegTSBaseStream *) stream;
699   const GstMpegTsDescriptor *desc = NULL;
700   int i, nb;
701
702   desc =
703       mpegts_get_descriptor_from_stream (bstream,
704       GST_MTS_DESC_ISO_639_LANGUAGE);
705   if (desc) {
706     gchar lang_code[4];
707
708     nb = gst_mpegts_descriptor_parse_iso_639_language_nb (desc);
709
710     GST_DEBUG ("Found ISO 639 descriptor (%d entries)", nb);
711
712     for (i = 0; i < nb; i++)
713       if (gst_mpegts_descriptor_parse_iso_639_language_idx (desc, i, &lang_code,
714               NULL))
715         add_iso639_language_to_tags (stream, lang_code);
716
717     return;
718   }
719
720   desc =
721       mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_DVB_SUBTITLING);
722
723   if (desc) {
724     gchar lang_code[4];
725
726     nb = gst_mpegts_descriptor_parse_dvb_subtitling_nb (desc);
727
728     GST_DEBUG ("Found SUBTITLING descriptor (%d entries)", nb);
729
730     for (i = 0; i < nb; i++)
731       if (gst_mpegts_descriptor_parse_dvb_subtitling_idx (desc, i, &lang_code,
732               NULL, NULL, NULL))
733         add_iso639_language_to_tags (stream, lang_code);
734   }
735 }
736
737 static GstPad *
738 create_pad_for_stream (MpegTSBase * base, MpegTSBaseStream * bstream,
739     MpegTSBaseProgram * program)
740 {
741   GstTSDemux *demux = GST_TS_DEMUX (base);
742   TSDemuxStream *stream = (TSDemuxStream *) bstream;
743   gchar *name = NULL;
744   GstCaps *caps = NULL;
745   GstPadTemplate *template = NULL;
746   const GstMpegTsDescriptor *desc = NULL;
747   GstPad *pad = NULL;
748
749   gst_ts_demux_create_tags (stream);
750
751   GST_LOG ("Attempting to create pad for stream 0x%04x with stream_type %d",
752       bstream->pid, bstream->stream_type);
753
754   /* First handle BluRay-specific stream types since there is some overlap
755    * between BluRay and non-BluRay streay type identifiers */
756   if (program->registration_id == DRF_ID_HDMV) {
757     switch (bstream->stream_type) {
758       case ST_BD_AUDIO_AC3:
759       {
760         const GstMpegTsDescriptor *ac3_desc;
761
762         /* ATSC ac3 audio descriptor */
763         ac3_desc =
764             mpegts_get_descriptor_from_stream (bstream,
765             GST_MTS_DESC_AC3_AUDIO_STREAM);
766         if (ac3_desc && DESC_AC_AUDIO_STREAM_bsid (ac3_desc->data) != 16) {
767           GST_LOG ("ac3 audio");
768           template = gst_static_pad_template_get (&audio_template);
769           name = g_strdup_printf ("audio_%04x", bstream->pid);
770           caps = gst_caps_new_empty_simple ("audio/x-ac3");
771         } else {
772           template = gst_static_pad_template_get (&audio_template);
773           name = g_strdup_printf ("audio_%04x", bstream->pid);
774           caps = gst_caps_new_empty_simple ("audio/x-eac3");
775         }
776         break;
777       }
778       case ST_BD_AUDIO_EAC3:
779         template = gst_static_pad_template_get (&audio_template);
780         name = g_strdup_printf ("audio_%04x", bstream->pid);
781         caps = gst_caps_new_empty_simple ("audio/x-eac3");
782         break;
783       case ST_BD_AUDIO_AC3_TRUE_HD:
784         template = gst_static_pad_template_get (&audio_template);
785         name = g_strdup_printf ("audio_%04x", bstream->pid);
786         caps = gst_caps_new_empty_simple ("audio/x-true-hd");
787         break;
788       case ST_BD_AUDIO_LPCM:
789         template = gst_static_pad_template_get (&audio_template);
790         name = g_strdup_printf ("audio_%04x", bstream->pid);
791         caps = gst_caps_new_empty_simple ("audio/x-private-ts-lpcm");
792         break;
793       case ST_BD_PGS_SUBPICTURE:
794         template = gst_static_pad_template_get (&subpicture_template);
795         name = g_strdup_printf ("subpicture_%04x", bstream->pid);
796         caps = gst_caps_new_empty_simple ("subpicture/x-pgs");
797         break;
798     }
799   }
800   if (template && name && caps)
801     goto done;
802
803   /* Handle non-BluRay stream types */
804   switch (bstream->stream_type) {
805     case GST_MPEG_TS_STREAM_TYPE_VIDEO_MPEG1:
806     case GST_MPEG_TS_STREAM_TYPE_VIDEO_MPEG2:
807     case ST_PS_VIDEO_MPEG2_DCII:
808       /* FIXME : Use DCII registration code (ETV1 ?) to handle that special
809        * Stream type (ST_PS_VIDEO_MPEG2_DCII) */
810       /* FIXME : Use video decriptor (0x1) to refine caps with:
811        * * frame_rate
812        * * profile_and_level
813        */
814       GST_LOG ("mpeg video");
815       template = gst_static_pad_template_get (&video_template);
816       name = g_strdup_printf ("video_%04x", bstream->pid);
817       caps = gst_caps_new_simple ("video/mpeg",
818           "mpegversion", G_TYPE_INT,
819           bstream->stream_type == GST_MPEG_TS_STREAM_TYPE_VIDEO_MPEG1 ? 1 : 2,
820           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
821
822       break;
823     case GST_MPEG_TS_STREAM_TYPE_AUDIO_MPEG1:
824     case GST_MPEG_TS_STREAM_TYPE_AUDIO_MPEG2:
825       GST_LOG ("mpeg audio");
826       template = gst_static_pad_template_get (&audio_template);
827       name = g_strdup_printf ("audio_%04x", bstream->pid);
828       caps =
829           gst_caps_new_simple ("audio/mpeg", "mpegversion", G_TYPE_INT, 1,
830           NULL);
831       /* HDV is always mpeg 1 audio layer 2 */
832       if (program->registration_id == DRF_ID_TSHV)
833         gst_caps_set_simple (caps, "layer", G_TYPE_INT, 2, NULL);
834       break;
835     case GST_MPEG_TS_STREAM_TYPE_PRIVATE_PES_PACKETS:
836       GST_LOG ("private data");
837       /* FIXME: Move all of this into a common method (there might be other
838        * types also, depending on registratino descriptors also
839        */
840       desc = mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_DVB_AC3);
841       if (desc) {
842         GST_LOG ("ac3 audio");
843         template = gst_static_pad_template_get (&audio_template);
844         name = g_strdup_printf ("audio_%04x", bstream->pid);
845         caps = gst_caps_new_empty_simple ("audio/x-ac3");
846         break;
847       }
848
849       desc =
850           mpegts_get_descriptor_from_stream (bstream,
851           GST_MTS_DESC_DVB_ENHANCED_AC3);
852       if (desc) {
853         GST_LOG ("ac3 audio");
854         template = gst_static_pad_template_get (&audio_template);
855         name = g_strdup_printf ("audio_%04x", bstream->pid);
856         caps = gst_caps_new_empty_simple ("audio/x-eac3");
857         break;
858       }
859       desc =
860           mpegts_get_descriptor_from_stream (bstream,
861           GST_MTS_DESC_DVB_TELETEXT);
862       if (desc) {
863         GST_LOG ("teletext");
864         template = gst_static_pad_template_get (&private_template);
865         name = g_strdup_printf ("private_%04x", bstream->pid);
866         caps = gst_caps_new_empty_simple ("application/x-teletext");
867         break;
868       }
869       desc =
870           mpegts_get_descriptor_from_stream (bstream,
871           GST_MTS_DESC_DVB_SUBTITLING);
872       if (desc) {
873         GST_LOG ("subtitling");
874         template = gst_static_pad_template_get (&private_template);
875         name = g_strdup_printf ("private_%04x", bstream->pid);
876         caps = gst_caps_new_empty_simple ("subpicture/x-dvb");
877         break;
878       }
879
880       switch (bstream->registration_id) {
881         case DRF_ID_DTS1:
882         case DRF_ID_DTS2:
883         case DRF_ID_DTS3:
884           /* SMPTE registered DTS */
885           GST_LOG ("subtitling");
886           template = gst_static_pad_template_get (&private_template);
887           name = g_strdup_printf ("private_%04x", bstream->pid);
888           caps = gst_caps_new_empty_simple ("audio/x-dts");
889           break;
890         case DRF_ID_S302M:
891           template = gst_static_pad_template_get (&audio_template);
892           name = g_strdup_printf ("audio_%04x", bstream->pid);
893           caps = gst_caps_new_empty_simple ("audio/x-smpte-302m");
894           break;
895         case DRF_ID_HEVC:
896           template = gst_static_pad_template_get (&video_template);
897           name = g_strdup_printf ("video_%04x", bstream->pid);
898           caps = gst_caps_new_simple ("video/x-h265",
899               "stream-format", G_TYPE_STRING, "byte-stream",
900               "alignment", G_TYPE_STRING, "nal", NULL);
901           break;
902       }
903       if (template)
904         break;
905
906       /* hack for itv hd (sid 10510, video pid 3401 */
907       if (program->program_number == 10510 && bstream->pid == 3401) {
908         template = gst_static_pad_template_get (&video_template);
909         name = g_strdup_printf ("video_%04x", bstream->pid);
910         caps = gst_caps_new_simple ("video/x-h264",
911             "stream-format", G_TYPE_STRING, "byte-stream",
912             "alignment", G_TYPE_STRING, "nal", NULL);
913       }
914       break;
915     case ST_HDV_AUX_V:
916       /* FIXME : Should only be used with specific PMT registration_descriptor */
917       /* We don't expose those streams since they're only helper streams */
918       /* template = gst_static_pad_template_get (&private_template); */
919       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
920       /* caps = gst_caps_new_simple ("hdv/aux-v", NULL); */
921       break;
922     case ST_HDV_AUX_A:
923       /* FIXME : Should only be used with specific PMT registration_descriptor */
924       /* We don't expose those streams since they're only helper streams */
925       /* template = gst_static_pad_template_get (&private_template); */
926       /* name = g_strdup_printf ("private_%04x", bstream->pid); */
927       /* caps = gst_caps_new_simple ("hdv/aux-a", NULL); */
928       break;
929     case GST_MPEG_TS_STREAM_TYPE_AUDIO_AAC_ADTS:
930       template = gst_static_pad_template_get (&audio_template);
931       name = g_strdup_printf ("audio_%04x", bstream->pid);
932       caps = gst_caps_new_simple ("audio/mpeg",
933           "mpegversion", G_TYPE_INT, 2,
934           "stream-format", G_TYPE_STRING, "adts", NULL);
935       break;
936     case GST_MPEG_TS_STREAM_TYPE_AUDIO_AAC_LATM:
937       template = gst_static_pad_template_get (&audio_template);
938       name = g_strdup_printf ("audio_%04x", bstream->pid);
939       caps = gst_caps_new_simple ("audio/mpeg",
940           "mpegversion", G_TYPE_INT, 4,
941           "stream-format", G_TYPE_STRING, "loas", NULL);
942       break;
943     case GST_MPEG_TS_STREAM_TYPE_VIDEO_MPEG4:
944       template = gst_static_pad_template_get (&video_template);
945       name = g_strdup_printf ("video_%04x", bstream->pid);
946       caps = gst_caps_new_simple ("video/mpeg",
947           "mpegversion", G_TYPE_INT, 4,
948           "systemstream", G_TYPE_BOOLEAN, FALSE, NULL);
949       break;
950     case GST_MPEG_TS_STREAM_TYPE_VIDEO_H264:
951       template = gst_static_pad_template_get (&video_template);
952       name = g_strdup_printf ("video_%04x", bstream->pid);
953       caps = gst_caps_new_simple ("video/x-h264",
954           "stream-format", G_TYPE_STRING, "byte-stream",
955           "alignment", G_TYPE_STRING, "nal", NULL);
956       break;
957     case GST_MPEG_TS_STREAM_TYPE_VIDEO_HEVC:
958       template = gst_static_pad_template_get (&video_template);
959       name = g_strdup_printf ("video_%04x", bstream->pid);
960       caps = gst_caps_new_simple ("video/x-h265",
961           "stream-format", G_TYPE_STRING, "byte-stream",
962           "alignment", G_TYPE_STRING, "nal", NULL);
963       break;
964     case ST_VIDEO_DIRAC:
965       if (bstream->registration_id == 0x64726163) {
966         GST_LOG ("dirac");
967         /* dirac in hex */
968         template = gst_static_pad_template_get (&video_template);
969         name = g_strdup_printf ("video_%04x", bstream->pid);
970         caps = gst_caps_new_empty_simple ("video/x-dirac");
971       }
972       break;
973     case ST_PRIVATE_EA:        /* Try to detect a VC1 stream */
974     {
975       gboolean is_vc1 = FALSE;
976
977       /* Note/FIXME: RP-227 specifies that the registration descriptor
978        * for vc1 can also contain other information, such as profile,
979        * level, alignment, buffer_size, .... */
980       if (bstream->registration_id == DRF_ID_VC1)
981         is_vc1 = TRUE;
982       if (!is_vc1) {
983         GST_WARNING ("0xea private stream type found but no descriptor "
984             "for VC1. Assuming plain VC1.");
985       }
986
987       template = gst_static_pad_template_get (&video_template);
988       name = g_strdup_printf ("video_%04x", bstream->pid);
989       caps = gst_caps_new_simple ("video/x-wmv",
990           "wmvversion", G_TYPE_INT, 3, "format", G_TYPE_STRING, "WVC1", NULL);
991
992       break;
993     }
994     case ST_PS_AUDIO_AC3:
995       /* DVB_ENHANCED_AC3 */
996       desc =
997           mpegts_get_descriptor_from_stream (bstream,
998           GST_MTS_DESC_DVB_ENHANCED_AC3);
999       if (desc) {
1000         template = gst_static_pad_template_get (&audio_template);
1001         name = g_strdup_printf ("audio_%04x", bstream->pid);
1002         caps = gst_caps_new_empty_simple ("audio/x-eac3");
1003         break;
1004       }
1005
1006       /* If stream has ac3 descriptor 
1007        * OR program is ATSC (GA94)
1008        * OR stream registration is AC-3
1009        * then it's regular AC3 */
1010       if (bstream->registration_id == DRF_ID_AC3 ||
1011           program->registration_id == DRF_ID_GA94 ||
1012           mpegts_get_descriptor_from_stream (bstream, GST_MTS_DESC_DVB_AC3)) {
1013         template = gst_static_pad_template_get (&audio_template);
1014         name = g_strdup_printf ("audio_%04x", bstream->pid);
1015         caps = gst_caps_new_empty_simple ("audio/x-ac3");
1016         break;
1017       }
1018
1019       GST_WARNING ("AC3 stream type found but no guaranteed "
1020           "way found to differentiate between AC3 and EAC3. "
1021           "Assuming plain AC3.");
1022       template = gst_static_pad_template_get (&audio_template);
1023       name = g_strdup_printf ("audio_%04x", bstream->pid);
1024       caps = gst_caps_new_empty_simple ("audio/x-ac3");
1025       break;
1026     case ST_PS_AUDIO_DTS:
1027       template = gst_static_pad_template_get (&audio_template);
1028       name = g_strdup_printf ("audio_%04x", bstream->pid);
1029       caps = gst_caps_new_empty_simple ("audio/x-dts");
1030       break;
1031     case ST_PS_AUDIO_LPCM:
1032       template = gst_static_pad_template_get (&audio_template);
1033       name = g_strdup_printf ("audio_%04x", bstream->pid);
1034       caps = gst_caps_new_empty_simple ("audio/x-lpcm");
1035       break;
1036     case ST_PS_DVD_SUBPICTURE:
1037       template = gst_static_pad_template_get (&subpicture_template);
1038       name = g_strdup_printf ("subpicture_%04x", bstream->pid);
1039       caps = gst_caps_new_empty_simple ("subpicture/x-dvd");
1040       break;
1041     default:
1042       GST_WARNING ("Non-media stream (stream_type:0x%x). Not creating pad",
1043           bstream->stream_type);
1044       break;
1045   }
1046
1047 done:
1048   if (template && name && caps) {
1049     GstEvent *event;
1050     gchar *stream_id;
1051
1052     GST_LOG ("stream:%p creating pad with name %s and caps %" GST_PTR_FORMAT,
1053         stream, name, caps);
1054     pad = gst_pad_new_from_template (template, name);
1055     gst_pad_set_active (pad, TRUE);
1056     gst_pad_use_fixed_caps (pad);
1057     stream_id =
1058         gst_pad_create_stream_id_printf (pad, GST_ELEMENT_CAST (base), "%08x",
1059         bstream->pid);
1060
1061     event = gst_pad_get_sticky_event (base->sinkpad, GST_EVENT_STREAM_START, 0);
1062     if (event) {
1063       if (gst_event_parse_group_id (event, &demux->group_id))
1064         demux->have_group_id = TRUE;
1065       else
1066         demux->have_group_id = FALSE;
1067       gst_event_unref (event);
1068     } else if (!demux->have_group_id) {
1069       demux->have_group_id = TRUE;
1070       demux->group_id = gst_util_group_id_next ();
1071     }
1072     event = gst_event_new_stream_start (stream_id);
1073     if (demux->have_group_id)
1074       gst_event_set_group_id (event, demux->group_id);
1075
1076     gst_pad_push_event (pad, event);
1077     g_free (stream_id);
1078     gst_pad_set_caps (pad, caps);
1079     if (!stream->taglist)
1080       stream->taglist = gst_tag_list_new_empty ();
1081     gst_pb_utils_add_codec_description_to_tag_list (stream->taglist, NULL,
1082         caps);
1083     gst_pad_set_query_function (pad, gst_ts_demux_srcpad_query);
1084     gst_pad_set_event_function (pad, gst_ts_demux_srcpad_event);
1085   }
1086
1087   if (name)
1088     g_free (name);
1089   if (template)
1090     gst_object_unref (template);
1091   if (caps)
1092     gst_caps_unref (caps);
1093
1094   return pad;
1095 }
1096
1097 static void
1098 gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream,
1099     MpegTSBaseProgram * program)
1100 {
1101   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1102
1103   if (!stream->pad) {
1104     /* Create the pad */
1105     if (bstream->stream_type != 0xff)
1106       stream->pad = create_pad_for_stream (base, bstream, program);
1107     stream->active = FALSE;
1108
1109     stream->need_newsegment = TRUE;
1110     stream->pts = GST_CLOCK_TIME_NONE;
1111     stream->dts = GST_CLOCK_TIME_NONE;
1112     stream->raw_pts = -1;
1113     stream->raw_dts = -1;
1114     stream->pending_ts = TRUE;
1115     stream->first_dts = GST_CLOCK_TIME_NONE;
1116     stream->continuity_counter = CONTINUITY_UNSET;
1117   }
1118   stream->flow_return = GST_FLOW_OK;
1119 }
1120
1121 static void
1122 gst_ts_demux_stream_removed (MpegTSBase * base, MpegTSBaseStream * bstream)
1123 {
1124   TSDemuxStream *stream = (TSDemuxStream *) bstream;
1125
1126   if (stream->pad) {
1127     if (stream->active && gst_pad_is_active (stream->pad)) {
1128       /* Flush out all data */
1129       GST_DEBUG_OBJECT (stream->pad, "Flushing out pending data");
1130       gst_ts_demux_push_pending_data ((GstTSDemux *) base, stream);
1131
1132       GST_DEBUG_OBJECT (stream->pad, "Pushing out EOS");
1133       gst_pad_push_event (stream->pad, gst_event_new_eos ());
1134       GST_DEBUG_OBJECT (stream->pad, "Deactivating and removing pad");
1135       gst_pad_set_active (stream->pad, FALSE);
1136       gst_element_remove_pad (GST_ELEMENT_CAST (base), stream->pad);
1137       stream->active = FALSE;
1138     }
1139     stream->pad = NULL;
1140   }
1141   gst_ts_demux_stream_flush (stream);
1142   stream->flow_return = GST_FLOW_NOT_LINKED;
1143 }
1144
1145 static void
1146 activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream)
1147 {
1148   GList *tmp;
1149   gboolean alldone = TRUE;
1150
1151   if (stream->pad) {
1152     GST_DEBUG_OBJECT (tsdemux, "Activating pad %s:%s for stream %p",
1153         GST_DEBUG_PAD_NAME (stream->pad), stream);
1154     gst_element_add_pad ((GstElement *) tsdemux, stream->pad);
1155     stream->active = TRUE;
1156     GST_DEBUG_OBJECT (stream->pad, "done adding pad");
1157
1158     /* Check if all pads were activated, and if so emit no-more-pads */
1159     for (tmp = tsdemux->program->stream_list; tmp; tmp = tmp->next) {
1160       stream = (TSDemuxStream *) tmp->data;
1161       if (stream->pad && !stream->active)
1162         alldone = FALSE;
1163     }
1164     if (alldone) {
1165       GST_DEBUG_OBJECT (tsdemux, "All pads were activated, emit no-more-pads");
1166       gst_element_no_more_pads ((GstElement *) tsdemux);
1167     }
1168   } else
1169     GST_WARNING_OBJECT (tsdemux,
1170         "stream %p (pid 0x%04x, type:0x%03x) has no pad", stream,
1171         ((MpegTSBaseStream *) stream)->pid,
1172         ((MpegTSBaseStream *) stream)->stream_type);
1173 }
1174
1175 static void
1176 gst_ts_demux_stream_flush (TSDemuxStream * stream)
1177 {
1178   GST_DEBUG ("flushing stream %p", stream);
1179
1180   if (stream->data)
1181     g_free (stream->data);
1182   stream->data = NULL;
1183   stream->state = PENDING_PACKET_EMPTY;
1184   stream->expected_size = 0;
1185   stream->allocated_size = 0;
1186   stream->current_size = 0;
1187   stream->need_newsegment = TRUE;
1188   stream->pts = GST_CLOCK_TIME_NONE;
1189   stream->dts = GST_CLOCK_TIME_NONE;
1190   stream->first_dts = GST_CLOCK_TIME_NONE;
1191   stream->raw_pts = -1;
1192   stream->raw_dts = -1;
1193   if (stream->flow_return == GST_FLOW_FLUSHING) {
1194     stream->flow_return = GST_FLOW_OK;
1195   }
1196   stream->continuity_counter = CONTINUITY_UNSET;
1197 }
1198
1199 static void
1200 gst_ts_demux_flush_streams (GstTSDemux * demux)
1201 {
1202   g_list_foreach (demux->program->stream_list,
1203       (GFunc) gst_ts_demux_stream_flush, NULL);
1204 }
1205
1206 static void
1207 gst_ts_demux_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
1208 {
1209   GstTSDemux *demux = GST_TS_DEMUX (base);
1210
1211   GST_DEBUG ("Current program %d, new program %d requested program %d",
1212       (gint) demux->program_number, program->program_number,
1213       demux->requested_program_number);
1214
1215   if (demux->requested_program_number == program->program_number ||
1216       (demux->requested_program_number == -1 && demux->program_number == -1)) {
1217
1218     GST_LOG ("program %d started", program->program_number);
1219     demux->program_number = program->program_number;
1220     demux->program = program;
1221
1222     /* If this is not the initial program, we need to calculate
1223      * an update newsegment */
1224     demux->calculate_update_segment = !program->initial_program;
1225
1226     /* FIXME : When do we emit no_more_pads ? */
1227   }
1228 }
1229
1230 static void
1231 gst_ts_demux_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
1232 {
1233   GstTSDemux *demux = GST_TS_DEMUX (base);
1234
1235   if (demux->program == program) {
1236     demux->program = NULL;
1237     demux->program_number = -1;
1238   }
1239 }
1240
1241
1242 static inline void
1243 gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream,
1244     guint64 pts, guint64 offset)
1245 {
1246   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1247
1248   stream->raw_pts = pts;
1249   if (pts == -1) {
1250     stream->pts = GST_CLOCK_TIME_NONE;
1251     return;
1252   }
1253
1254   GST_LOG ("pid 0x%04x raw pts:%" G_GUINT64_FORMAT " at offset %"
1255       G_GUINT64_FORMAT, bs->pid, pts, offset);
1256
1257   /* Compute PTS in GstClockTime */
1258   stream->pts =
1259       mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
1260       MPEGTIME_TO_GSTTIME (pts), demux->program->pcr_pid);
1261
1262   GST_LOG ("pid 0x%04x Stored PTS %" G_GUINT64_FORMAT, bs->pid, stream->pts);
1263
1264   if (G_UNLIKELY (demux->emit_statistics)) {
1265     GstStructure *st;
1266     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
1267     gst_structure_id_set (st,
1268         QUARK_PID, G_TYPE_UINT, bs->pid,
1269         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_PTS, G_TYPE_UINT64, pts,
1270         NULL);
1271     gst_element_post_message (GST_ELEMENT_CAST (demux),
1272         gst_message_new_element (GST_OBJECT (demux), st));
1273   }
1274 }
1275
1276 static inline void
1277 gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream,
1278     guint64 dts, guint64 offset)
1279 {
1280   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1281
1282   stream->raw_dts = dts;
1283   if (dts == -1) {
1284     stream->dts = GST_CLOCK_TIME_NONE;
1285     return;
1286   }
1287
1288   GST_LOG ("pid 0x%04x raw dts:%" G_GUINT64_FORMAT " at offset %"
1289       G_GUINT64_FORMAT, bs->pid, dts, offset);
1290
1291   /* Compute DTS in GstClockTime */
1292   stream->dts =
1293       mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
1294       MPEGTIME_TO_GSTTIME (dts), demux->program->pcr_pid);
1295
1296   GST_LOG ("pid 0x%04x Stored DTS %" G_GUINT64_FORMAT, bs->pid, stream->dts);
1297
1298   if (G_UNLIKELY (demux->emit_statistics)) {
1299     GstStructure *st;
1300     st = gst_structure_new_id_empty (QUARK_TSDEMUX);
1301     gst_structure_id_set (st,
1302         QUARK_PID, G_TYPE_UINT, bs->pid,
1303         QUARK_OFFSET, G_TYPE_UINT64, offset, QUARK_DTS, G_TYPE_UINT64, dts,
1304         NULL);
1305     gst_element_post_message (GST_ELEMENT_CAST (demux),
1306         gst_message_new_element (GST_OBJECT (demux), st));
1307   }
1308 }
1309
1310 /* This is called when we haven't got a valid initial PTS/DTS on all streams */
1311 static gboolean
1312 check_pending_buffers (GstTSDemux * demux, TSDemuxStream * stream)
1313 {
1314   gboolean have_observation = FALSE;
1315   /* The biggest offset */
1316   guint64 offset = 0;
1317   GList *tmp;
1318
1319   /* 1. Go over all streams */
1320   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1321     TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
1322     /* 1.1 check if at least one stream got a valid DTS */
1323     if ((tmpstream->raw_dts != -1 && tmpstream->dts != GST_CLOCK_TIME_NONE) ||
1324         (tmpstream->raw_pts != -1 && tmpstream->pts != GST_CLOCK_TIME_NONE)) {
1325       have_observation = TRUE;
1326       break;
1327     }
1328   }
1329
1330   /* 2. If we don't have a valid value yet, break out */
1331   if (have_observation == FALSE)
1332     return FALSE;
1333
1334   /* 3. Go over all streams that have current/pending data */
1335   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1336     TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data;
1337     PendingBuffer *pend;
1338     guint64 firstval, lastval, ts;
1339
1340     /* 3.1 Calculate the offset between current DTS and first DTS */
1341     if (tmpstream->pending == NULL || tmpstream->state == PENDING_PACKET_EMPTY)
1342       continue;
1343     /* If we don't have any pending data, the offset is 0 for this stream */
1344     if (tmpstream->pending == NULL)
1345       break;
1346     if (tmpstream->raw_dts != -1)
1347       lastval = tmpstream->raw_dts;
1348     else if (tmpstream->raw_pts != -1)
1349       lastval = tmpstream->raw_pts;
1350     else {
1351       GST_WARNING ("Don't have a last DTS/PTS to use for offset recalculation");
1352       continue;
1353     }
1354     pend = tmpstream->pending->data;
1355     if (pend->dts != -1)
1356       firstval = pend->dts;
1357     else if (pend->pts != -1)
1358       firstval = pend->pts;
1359     else {
1360       GST_WARNING
1361           ("Don't have a first DTS/PTS to use for offset recalculation");
1362       continue;
1363     }
1364     /* 3.2 Add to the offset the report TS for the current DTS */
1365     ts = mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
1366         MPEGTIME_TO_GSTTIME (lastval), demux->program->pcr_pid);
1367     if (ts == GST_CLOCK_TIME_NONE) {
1368       GST_WARNING ("THIS SHOULD NOT HAPPEN !");
1369       continue;
1370     }
1371     ts += MPEGTIME_TO_GSTTIME (lastval - firstval);
1372     /* 3.3 If that offset is bigger than the current offset, store it */
1373     if (ts > offset)
1374       offset = ts;
1375   }
1376
1377   GST_DEBUG ("New initial pcr_offset %" GST_TIME_FORMAT,
1378       GST_TIME_ARGS (offset));
1379
1380   /* 4. Set the offset on the packetizer */
1381   mpegts_packetizer_set_current_pcr_offset (MPEG_TS_BASE_PACKETIZER (demux),
1382       offset, demux->program->pcr_pid);
1383
1384   /* 4. Go over all streams */
1385   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1386     TSDemuxStream *stream = (TSDemuxStream *) tmp->data;
1387
1388     stream->pending_ts = FALSE;
1389     /* 4.1 Set pending_ts for FALSE */
1390
1391     /* 4.2 Recalculate PTS/DTS (in running time) for pending data */
1392     if (stream->pending) {
1393       GList *tmp2;
1394       for (tmp2 = stream->pending; tmp2; tmp2 = tmp2->next) {
1395         PendingBuffer *pend = (PendingBuffer *) tmp2->data;
1396         if (pend->pts != -1)
1397           GST_BUFFER_PTS (pend->buffer) =
1398               mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
1399               MPEGTIME_TO_GSTTIME (pend->pts), demux->program->pcr_pid);
1400         if (pend->dts != -1)
1401           GST_BUFFER_DTS (pend->buffer) =
1402               mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
1403               MPEGTIME_TO_GSTTIME (pend->dts), demux->program->pcr_pid);
1404         /* 4.2.2 Set first_dts to TS of lowest DTS (for segment) */
1405         if (stream->first_dts == GST_CLOCK_TIME_NONE) {
1406           if (GST_BUFFER_DTS (pend->buffer) != GST_CLOCK_TIME_NONE)
1407             stream->first_dts = GST_BUFFER_DTS (pend->buffer);
1408           else if (GST_BUFFER_PTS (pend->buffer) != GST_CLOCK_TIME_NONE)
1409             stream->first_dts = GST_BUFFER_PTS (pend->buffer);
1410         }
1411       }
1412     }
1413     /* Recalculate PTS/DTS (in running time) for current data */
1414     if (stream->state != PENDING_PACKET_EMPTY) {
1415       if (stream->raw_dts != -1) {
1416         stream->dts =
1417             mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
1418             MPEGTIME_TO_GSTTIME (stream->raw_dts), demux->program->pcr_pid);
1419         if (stream->first_dts == GST_CLOCK_TIME_NONE)
1420           stream->first_dts = stream->dts;
1421       }
1422       if (stream->raw_pts != -1) {
1423         stream->pts =
1424             mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux),
1425             MPEGTIME_TO_GSTTIME (stream->raw_pts), demux->program->pcr_pid);
1426         if (stream->first_dts == GST_CLOCK_TIME_NONE)
1427           stream->first_dts = stream->pts;
1428       }
1429     }
1430   }
1431
1432   return TRUE;
1433 }
1434
1435 static void
1436 gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream,
1437     guint8 * data, guint32 length, guint64 bufferoffset)
1438 {
1439   PESHeader header;
1440   PESParsingResult parseres;
1441
1442   GST_MEMDUMP ("Header buffer", data, MIN (length, 32));
1443
1444   parseres = mpegts_parse_pes_header (data, length, &header);
1445   if (G_UNLIKELY (parseres == PES_PARSING_NEED_MORE))
1446     goto discont;
1447   if (G_UNLIKELY (parseres == PES_PARSING_BAD)) {
1448     GST_WARNING ("Error parsing PES header. pid: 0x%x stream_type: 0x%x",
1449         stream->stream.pid, stream->stream.stream_type);
1450     goto discont;
1451   }
1452
1453   gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset);
1454   gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset);
1455   if (G_UNLIKELY (stream->pending_ts &&
1456           (stream->pts != GST_CLOCK_TIME_NONE
1457               || stream->dts != GST_CLOCK_TIME_NONE))) {
1458     GST_DEBUG ("Got pts/dts update, rechecking all streams");
1459     check_pending_buffers (demux, stream);
1460   } else if (stream->first_dts == GST_CLOCK_TIME_NONE) {
1461     if (GST_CLOCK_TIME_IS_VALID (stream->dts))
1462       stream->first_dts = stream->dts;
1463     else if (GST_CLOCK_TIME_IS_VALID (stream->pts))
1464       stream->first_dts = stream->pts;
1465   }
1466
1467   GST_DEBUG_OBJECT (demux,
1468       "stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT,
1469       GST_TIME_ARGS (stream->pts), GST_TIME_ARGS (stream->dts));
1470
1471   /* Remove PES headers */
1472   GST_DEBUG ("Moving data forward by %d bytes (packet_size:%d, have:%d)",
1473       header.header_size, header.packet_length, length);
1474   stream->expected_size = header.packet_length;
1475   if (stream->expected_size) {
1476     if (G_LIKELY (stream->expected_size > header.header_size)) {
1477       stream->expected_size -= header.header_size;
1478     } else {
1479       /* next packet will have to complete this one */
1480       GST_ERROR ("invalid header and packet size combination");
1481       stream->expected_size = 0;
1482     }
1483   }
1484   data += header.header_size;
1485   length -= header.header_size;
1486
1487   /* Create the output buffer */
1488   if (stream->expected_size)
1489     stream->allocated_size = MAX (stream->expected_size, length);
1490   else
1491     stream->allocated_size = MAX (8192, length);
1492
1493   g_assert (stream->data == NULL);
1494   stream->data = g_malloc (stream->allocated_size);
1495   memcpy (stream->data, data, length);
1496   stream->current_size = length;
1497
1498   stream->state = PENDING_PACKET_BUFFER;
1499
1500   return;
1501
1502 discont:
1503   stream->state = PENDING_PACKET_DISCONT;
1504   return;
1505 }
1506
1507  /* ONLY CALL THIS:
1508   * * WITH packet->payload != NULL
1509   * * WITH pending/current flushed out if beginning of new PES packet
1510   */
1511 static inline void
1512 gst_ts_demux_queue_data (GstTSDemux * demux, TSDemuxStream * stream,
1513     MpegTSPacketizerPacket * packet)
1514 {
1515   guint8 *data;
1516   guint size;
1517   guint8 cc = FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc);
1518
1519   GST_LOG ("pid: 0x%04x state:%d", stream->stream.pid, stream->state);
1520
1521   size = packet->data_end - packet->payload;
1522   data = packet->payload;
1523
1524   if (stream->continuity_counter == CONTINUITY_UNSET) {
1525     GST_DEBUG ("CONTINUITY: Initialize to %d", cc);
1526   } else if ((cc == stream->continuity_counter + 1 ||
1527           (stream->continuity_counter == MAX_CONTINUITY && cc == 0))) {
1528     GST_LOG ("CONTINUITY: Got expected %d", cc);
1529   } else {
1530     GST_WARNING ("CONTINUITY: Mismatch packet %d, stream %d",
1531         cc, stream->continuity_counter);
1532     stream->state = PENDING_PACKET_DISCONT;
1533   }
1534   stream->continuity_counter = cc;
1535
1536   if (stream->state == PENDING_PACKET_EMPTY) {
1537     if (G_UNLIKELY (!packet->payload_unit_start_indicator)) {
1538       stream->state = PENDING_PACKET_DISCONT;
1539       GST_DEBUG ("Didn't get the first packet of this PES");
1540     } else {
1541       GST_LOG ("EMPTY=>HEADER");
1542       stream->state = PENDING_PACKET_HEADER;
1543     }
1544   }
1545
1546   switch (stream->state) {
1547     case PENDING_PACKET_HEADER:
1548     {
1549       GST_LOG ("HEADER: Parsing PES header");
1550
1551       /* parse the header */
1552       gst_ts_demux_parse_pes_header (demux, stream, data, size, packet->offset);
1553       break;
1554     }
1555     case PENDING_PACKET_BUFFER:
1556     {
1557       GST_LOG ("BUFFER: appending data");
1558       if (G_UNLIKELY (stream->current_size + size > stream->allocated_size)) {
1559         GST_LOG ("resizing buffer");
1560         do {
1561           stream->allocated_size *= 2;
1562         } while (stream->current_size + size > stream->allocated_size);
1563         stream->data = g_realloc (stream->data, stream->allocated_size);
1564       }
1565       memcpy (stream->data + stream->current_size, data, size);
1566       stream->current_size += size;
1567       break;
1568     }
1569     case PENDING_PACKET_DISCONT:
1570     {
1571       GST_LOG ("DISCONT: not storing/pushing");
1572       if (G_UNLIKELY (stream->data)) {
1573         g_free (stream->data);
1574         stream->data = NULL;
1575       }
1576       stream->continuity_counter = CONTINUITY_UNSET;
1577       break;
1578     }
1579     default:
1580       break;
1581   }
1582
1583   return;
1584 }
1585
1586 static void
1587 calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream)
1588 {
1589   MpegTSBase *base = (MpegTSBase *) demux;
1590   GstClockTime lowest_pts = GST_CLOCK_TIME_NONE;
1591   GstClockTime firstts = 0;
1592   GList *tmp;
1593
1594   GST_DEBUG ("Creating new newsegment for stream %p", stream);
1595
1596   /* 1) If we need to calculate an update newsegment, do it
1597    * 2) If we need to calculate a new newsegment, do it
1598    * 3) If an update_segment is valid, push it
1599    * 4) If a newsegment is valid, push it */
1600
1601   /* Speedup : if we don't need to calculate anything, go straight to pushing */
1602   if (!demux->calculate_update_segment && demux->segment_event)
1603     goto push_new_segment;
1604
1605   /* Calculate the 'new_start' value, used for both updates and newsegment */
1606   for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) {
1607     TSDemuxStream *pstream = (TSDemuxStream *) tmp->data;
1608
1609     if (GST_CLOCK_TIME_IS_VALID (pstream->first_dts)) {
1610       if (!GST_CLOCK_TIME_IS_VALID (lowest_pts)
1611           || pstream->first_dts < lowest_pts)
1612         lowest_pts = pstream->first_dts;
1613     }
1614   }
1615   if (GST_CLOCK_TIME_IS_VALID (lowest_pts))
1616     firstts = lowest_pts;
1617   GST_DEBUG ("lowest_pts %" G_GUINT64_FORMAT " => clocktime %" GST_TIME_FORMAT,
1618       lowest_pts, GST_TIME_ARGS (firstts));
1619
1620   if (demux->calculate_update_segment) {
1621     GST_DEBUG ("Calculating update segment");
1622     /* If we have a valid segment, create an update of that */
1623     if (demux->segment.format == GST_FORMAT_TIME) {
1624       GstSegment update_segment;
1625       GST_DEBUG ("Re-using segment " SEGMENT_FORMAT,
1626           SEGMENT_ARGS (demux->segment));
1627       gst_segment_copy_into (&demux->segment, &update_segment);
1628       update_segment.stop = firstts;
1629       demux->update_segment = gst_event_new_segment (&update_segment);
1630     }
1631     demux->calculate_update_segment = FALSE;
1632   }
1633
1634   if (demux->segment.format != GST_FORMAT_TIME) {
1635     /* It will happen only if it's first program or after flushes. */
1636     GST_DEBUG ("Calculating actual segment");
1637     if (base->segment.format == GST_FORMAT_TIME) {
1638       /* Try to recover segment info from base if it's in TIME format */
1639       demux->segment = base->segment;
1640     } else {
1641       /* Start from the first ts/pts */
1642       gst_segment_init (&demux->segment, GST_FORMAT_TIME);
1643       demux->segment.start = firstts;
1644       demux->segment.stop = GST_CLOCK_TIME_NONE;
1645       demux->segment.position = firstts;
1646       demux->segment.time = firstts;
1647       demux->segment.rate = demux->rate;
1648     }
1649   }
1650
1651   if (!demux->segment_event) {
1652     demux->segment_event = gst_event_new_segment (&demux->segment);
1653     GST_EVENT_SEQNUM (demux->segment_event) = base->last_seek_seqnum;
1654   }
1655
1656 push_new_segment:
1657   if (demux->update_segment) {
1658     GST_DEBUG_OBJECT (stream->pad, "Pushing update segment");
1659     gst_event_ref (demux->update_segment);
1660     gst_pad_push_event (stream->pad, demux->update_segment);
1661   }
1662
1663   if (demux->segment_event) {
1664     GST_DEBUG_OBJECT (stream->pad, "Pushing newsegment event");
1665     gst_event_ref (demux->segment_event);
1666     gst_pad_push_event (stream->pad, demux->segment_event);
1667   }
1668
1669   /* Push pending tags */
1670   if (stream->taglist) {
1671     GST_DEBUG_OBJECT (stream->pad, "Sending tags %" GST_PTR_FORMAT,
1672         stream->taglist);
1673     gst_pad_push_event (stream->pad, gst_event_new_tag (stream->taglist));
1674     stream->taglist = NULL;
1675   }
1676
1677   stream->need_newsegment = FALSE;
1678 }
1679
1680 static GstFlowReturn
1681 gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream)
1682 {
1683   GstFlowReturn res = GST_FLOW_OK;
1684 #ifndef GST_DISABLE_GST_DEBUG
1685   MpegTSBaseStream *bs = (MpegTSBaseStream *) stream;
1686 #endif
1687   GstBuffer *buffer = NULL;
1688
1689   GST_DEBUG_OBJECT (stream->pad,
1690       "stream:%p, pid:0x%04x stream_type:%d state:%d", stream, bs->pid,
1691       bs->stream_type, stream->state);
1692
1693   if (G_UNLIKELY (stream->data == NULL)) {
1694     GST_LOG ("stream->data == NULL");
1695     goto beach;
1696   }
1697
1698   if (G_UNLIKELY (stream->state == PENDING_PACKET_EMPTY)) {
1699     GST_LOG ("EMPTY: returning");
1700     goto beach;
1701   }
1702
1703   if (G_UNLIKELY (stream->state != PENDING_PACKET_BUFFER)) {
1704     GST_LOG ("state:%d, returning", stream->state);
1705     goto beach;
1706   }
1707
1708   if (G_UNLIKELY (demux->program == NULL)) {
1709     GST_LOG_OBJECT (demux, "No program");
1710     g_free (stream->data);
1711     goto beach;
1712   }
1713
1714   buffer = gst_buffer_new_wrapped (stream->data, stream->current_size);
1715
1716   if (G_UNLIKELY (stream->pending_ts && !check_pending_buffers (demux, stream))) {
1717     PendingBuffer *pend;
1718     pend = g_slice_new0 (PendingBuffer);
1719     pend->buffer = buffer;
1720     pend->pts = stream->raw_pts;
1721     pend->dts = stream->raw_dts;
1722     stream->pending = g_list_append (stream->pending, pend);
1723     GST_DEBUG ("Not enough information to push buffers yet, storing buffer");
1724     goto beach;
1725   }
1726
1727   if (G_UNLIKELY (!stream->active))
1728     activate_pad_for_stream (demux, stream);
1729
1730   if (G_UNLIKELY (stream->need_newsegment))
1731     calculate_and_push_newsegment (demux, stream);
1732
1733   /* FIXME : Push pending buffers if any */
1734   if (G_UNLIKELY (stream->pending)) {
1735     GList *tmp;
1736     for (tmp = stream->pending; tmp; tmp = tmp->next) {
1737       PendingBuffer *pend = (PendingBuffer *) tmp->data;
1738
1739       GST_DEBUG_OBJECT (stream->pad,
1740           "Pushing pending buffer PTS:%" GST_TIME_FORMAT " DTS:%"
1741           GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (pend->buffer)),
1742           GST_TIME_ARGS (GST_BUFFER_DTS (pend->buffer)));
1743
1744       res = gst_pad_push (stream->pad, pend->buffer);
1745       g_slice_free (PendingBuffer, pend);
1746     }
1747     g_list_free (stream->pending);
1748     stream->pending = NULL;
1749   }
1750
1751   GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT,
1752       GST_TIME_ARGS (stream->pts));
1753   if (GST_CLOCK_TIME_IS_VALID (stream->pts))
1754     GST_BUFFER_PTS (buffer) = stream->pts;
1755   if (GST_CLOCK_TIME_IS_VALID (stream->dts))
1756     GST_BUFFER_DTS (buffer) = stream->dts;
1757
1758   GST_DEBUG_OBJECT (stream->pad,
1759       "Pushing buffer with PTS: %" GST_TIME_FORMAT " , DTS: %" GST_TIME_FORMAT,
1760       GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
1761       GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
1762
1763   res = gst_pad_push (stream->pad, buffer);
1764   GST_DEBUG_OBJECT (stream->pad, "Returned %s", gst_flow_get_name (res));
1765   res = tsdemux_combine_flows (demux, stream, res);
1766   GST_DEBUG_OBJECT (stream->pad, "combined %s", gst_flow_get_name (res));
1767
1768 beach:
1769   /* Reset everything */
1770   GST_LOG ("Resetting to EMPTY, returning %s", gst_flow_get_name (res));
1771   stream->state = PENDING_PACKET_EMPTY;
1772   stream->data = NULL;
1773   stream->expected_size = 0;
1774   stream->current_size = 0;
1775
1776   return res;
1777 }
1778
1779 static GstFlowReturn
1780 gst_ts_demux_handle_packet (GstTSDemux * demux, TSDemuxStream * stream,
1781     MpegTSPacketizerPacket * packet, GstMpegTsSection * section)
1782 {
1783   GstFlowReturn res = GST_FLOW_OK;
1784
1785   GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p", packet->pid,
1786       packet->payload_unit_start_indicator, packet->scram_afc_cc & 0x30,
1787       FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload);
1788
1789   if (G_UNLIKELY (packet->payload_unit_start_indicator) &&
1790       FLAGS_HAS_PAYLOAD (packet->scram_afc_cc))
1791     /* Flush previous data */
1792     res = gst_ts_demux_push_pending_data (demux, stream);
1793
1794   if (packet->payload && (res == GST_FLOW_OK || res == GST_FLOW_NOT_LINKED)
1795       && stream->pad) {
1796     gst_ts_demux_queue_data (demux, stream, packet);
1797     GST_LOG ("current_size:%d, expected_size:%d",
1798         stream->current_size, stream->expected_size);
1799     /* Finally check if the data we queued completes a packet */
1800     if (stream->expected_size && stream->current_size == stream->expected_size) {
1801       GST_LOG ("pushing complete packet");
1802       res = gst_ts_demux_push_pending_data (demux, stream);
1803     }
1804   }
1805
1806   return res;
1807 }
1808
1809 static void
1810 gst_ts_demux_flush (MpegTSBase * base, gboolean hard)
1811 {
1812   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
1813
1814   gst_ts_demux_flush_streams (demux);
1815
1816   if (demux->segment_event) {
1817     gst_event_unref (demux->segment_event);
1818     demux->segment_event = NULL;
1819   }
1820   demux->calculate_update_segment = FALSE;
1821   if (hard) {
1822     /* For pull mode seeks the current segment needs to be preserved */
1823     demux->rate = 1.0;
1824     gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
1825   }
1826 }
1827
1828 static GstFlowReturn
1829 gst_ts_demux_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
1830     GstMpegTsSection * section)
1831 {
1832   GstTSDemux *demux = GST_TS_DEMUX_CAST (base);
1833   TSDemuxStream *stream = NULL;
1834   GstFlowReturn res = GST_FLOW_OK;
1835
1836   if (G_LIKELY (demux->program)) {
1837     stream = (TSDemuxStream *) demux->program->streams[packet->pid];
1838
1839     if (stream) {
1840       res = gst_ts_demux_handle_packet (demux, stream, packet, section);
1841     }
1842   }
1843   return res;
1844 }
1845
1846 gboolean
1847 gst_ts_demux_plugin_init (GstPlugin * plugin)
1848 {
1849   GST_DEBUG_CATEGORY_INIT (ts_demux_debug, "tsdemux", 0,
1850       "MPEG transport stream demuxer");
1851   init_pes_parser ();
1852
1853   return gst_element_register (plugin, "tsdemux",
1854       GST_RANK_PRIMARY, GST_TYPE_TS_DEMUX);
1855 }