mpegtsparse: Set delta unit flag on non-random-access buffers
[platform/upstream/gstreamer.git] / gst / mpegtsdemux / mpegtsparse.c
1 /*
2  * mpegtsparse.c - 
3  * Copyright (C) 2007 Alessandro Decina
4  * 
5  * Authors:
6  *   Alessandro Decina <alessandro@nnva.org>
7  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32
33 #include "mpegtsbase.h"
34 #include "mpegtsparse.h"
35 #include "gstmpegdesc.h"
36
37 /* latency in mseconds is maximum 100 ms between PCR */
38 #define TS_LATENCY 100
39
40 #define TABLE_ID_UNSET 0xFF
41 #define RUNNING_STATUS_RUNNING 4
42
43 GST_DEBUG_CATEGORY_STATIC (mpegts_parse_debug);
44 #define GST_CAT_DEFAULT mpegts_parse_debug
45
46 typedef struct _MpegTSParsePad MpegTSParsePad;
47
48 typedef struct
49 {
50   MpegTSBaseProgram program;
51   MpegTSParsePad *tspad;
52 } MpegTSParseProgram;
53
54 struct _MpegTSParsePad
55 {
56   GstPad *pad;
57
58   /* the program number that the peer wants on this pad */
59   gint program_number;
60   MpegTSParseProgram *program;
61
62   /* set to FALSE before a push and TRUE after */
63   gboolean pushed;
64
65   /* the return of the latest push */
66   GstFlowReturn flow_return;
67 };
68
69 static GstStaticPadTemplate src_template =
70 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
71     GST_PAD_ALWAYS,
72     GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
73     );
74
75 static GstStaticPadTemplate program_template =
76 GST_STATIC_PAD_TEMPLATE ("program_%u", GST_PAD_SRC,
77     GST_PAD_REQUEST,
78     GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
79     );
80
81 enum
82 {
83   PROP_0,
84   PROP_SET_TIMESTAMPS,
85   PROP_SMOOTHING_LATENCY,
86   PROP_PCR_PID,
87   /* FILL ME */
88 };
89
90 static void mpegts_parse_set_property (GObject * object, guint prop_id,
91     const GValue * value, GParamSpec * pspec);
92 static void mpegts_parse_get_property (GObject * object, guint prop_id,
93     GValue * value, GParamSpec * pspec);
94
95 static void
96 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
97 static void
98 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
99
100 static GstFlowReturn
101 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
102     GstMpegtsSection * section);
103 static void mpegts_parse_inspect_packet (MpegTSBase * base,
104     MpegTSPacketizerPacket * packet);
105 static GstFlowReturn mpegts_parse_have_buffer (MpegTSBase * base,
106     GstBuffer * buffer);
107
108 static MpegTSParsePad *mpegts_parse_create_tspad (MpegTSParse2 * parse,
109     const gchar * name);
110 static void mpegts_parse_destroy_tspad (MpegTSParse2 * parse,
111     MpegTSParsePad * tspad);
112
113 static void mpegts_parse_pad_removed (GstElement * element, GstPad * pad);
114 static GstPad *mpegts_parse_request_new_pad (GstElement * element,
115     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
116 static void mpegts_parse_release_pad (GstElement * element, GstPad * pad);
117 static gboolean mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent,
118     GstQuery * query);
119 static gboolean push_event (MpegTSBase * base, GstEvent * event);
120
121 #define mpegts_parse_parent_class parent_class
122 G_DEFINE_TYPE (MpegTSParse2, mpegts_parse, GST_TYPE_MPEGTS_BASE);
123 static void mpegts_parse_reset (MpegTSBase * base);
124 static GstFlowReturn
125 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all);
126
127 static void
128 mpegts_parse_dispose (GObject * object)
129 {
130   MpegTSParse2 *parse = (MpegTSParse2 *) object;
131
132   gst_flow_combiner_free (parse->flowcombiner);
133
134   GST_CALL_PARENT (G_OBJECT_CLASS, dispose, (object));
135 }
136
137 static void
138 mpegts_parse_class_init (MpegTSParse2Class * klass)
139 {
140   GObjectClass *gobject_class = (GObjectClass *) (klass);
141   GstElementClass *element_class;
142   MpegTSBaseClass *ts_class;
143
144   gobject_class->set_property = mpegts_parse_set_property;
145   gobject_class->get_property = mpegts_parse_get_property;
146   gobject_class->dispose = mpegts_parse_dispose;
147
148   g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS,
149       g_param_spec_boolean ("set-timestamps",
150           "Timestamp (or re-timestamp) the output stream",
151           "If set, timestamps will be set on the output buffers using "
152           "PCRs and smoothed over the smoothing-latency period", FALSE,
153           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154   g_object_class_install_property (gobject_class, PROP_SMOOTHING_LATENCY,
155       g_param_spec_uint ("smoothing-latency", "Smoothing Latency",
156           "Additional latency in microseconds for smoothing jitter in input timestamps on live capture",
157           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158   g_object_class_install_property (gobject_class, PROP_PCR_PID,
159       g_param_spec_int ("pcr-pid", "PID containing PCR",
160           "Set the PID to use for PCR values (-1 for auto)",
161           -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162
163   element_class = GST_ELEMENT_CLASS (klass);
164   element_class->pad_removed = mpegts_parse_pad_removed;
165   element_class->request_new_pad = mpegts_parse_request_new_pad;
166   element_class->release_pad = mpegts_parse_release_pad;
167
168   gst_element_class_add_static_pad_template (element_class, &src_template);
169   gst_element_class_add_static_pad_template (element_class, &program_template);
170
171   gst_element_class_set_static_metadata (element_class,
172       "MPEG transport stream parser", "Codec/Parser",
173       "Parses MPEG2 transport streams",
174       "Alessandro Decina <alessandro@nnva.org>, "
175       "Zaheer Abbas Merali <zaheerabbas at merali dot org>");
176
177   ts_class = GST_MPEGTS_BASE_CLASS (klass);
178   ts_class->push = GST_DEBUG_FUNCPTR (mpegts_parse_push);
179   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
180   ts_class->program_started = GST_DEBUG_FUNCPTR (mpegts_parse_program_started);
181   ts_class->program_stopped = GST_DEBUG_FUNCPTR (mpegts_parse_program_stopped);
182   ts_class->reset = GST_DEBUG_FUNCPTR (mpegts_parse_reset);
183   ts_class->inspect_packet = GST_DEBUG_FUNCPTR (mpegts_parse_inspect_packet);
184 }
185
186 static void
187 mpegts_parse_init (MpegTSParse2 * parse)
188 {
189   MpegTSBase *base = (MpegTSBase *) parse;
190
191   base->program_size = sizeof (MpegTSParseProgram);
192   base->push_data = TRUE;
193   base->push_section = TRUE;
194
195   parse->user_pcr_pid = parse->pcr_pid = -1;
196
197   parse->flowcombiner = gst_flow_combiner_new ();
198
199   parse->srcpad = gst_pad_new_from_static_template (&src_template, "src");
200   gst_flow_combiner_add_pad (parse->flowcombiner, parse->srcpad);
201   parse->first = TRUE;
202   gst_pad_set_query_function (parse->srcpad,
203       GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
204   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
205
206   parse->have_group_id = FALSE;
207   parse->group_id = G_MAXUINT;
208 }
209
210 static void
211 mpegts_parse_reset (MpegTSBase * base)
212 {
213   MpegTSParse2 *parse = (MpegTSParse2 *) base;
214
215   /* Set the various know PIDs we are interested in */
216
217   /* CAT */
218   MPEGTS_BIT_SET (base->known_psi, 1);
219   /* NIT, ST */
220   MPEGTS_BIT_SET (base->known_psi, 0x10);
221   /* SDT, BAT, ST */
222   MPEGTS_BIT_SET (base->known_psi, 0x11);
223   /* EIT, ST, CIT (TS 102 323) */
224   MPEGTS_BIT_SET (base->known_psi, 0x12);
225   /* RST, ST */
226   MPEGTS_BIT_SET (base->known_psi, 0x13);
227   /* RNT (TS 102 323) */
228   MPEGTS_BIT_SET (base->known_psi, 0x16);
229   /* inband signalling */
230   MPEGTS_BIT_SET (base->known_psi, 0x1c);
231   /* measurement */
232   MPEGTS_BIT_SET (base->known_psi, 0x1d);
233   /* DIT */
234   MPEGTS_BIT_SET (base->known_psi, 0x1e);
235   /* SIT */
236   MPEGTS_BIT_SET (base->known_psi, 0x1f);
237
238   parse->first = TRUE;
239   parse->have_group_id = FALSE;
240   parse->group_id = G_MAXUINT;
241
242   g_list_free_full (parse->pending_buffers, (GDestroyNotify) gst_buffer_unref);
243   parse->pending_buffers = NULL;
244
245   parse->current_pcr = GST_CLOCK_TIME_NONE;
246   parse->previous_pcr = GST_CLOCK_TIME_NONE;
247   parse->base_pcr = GST_CLOCK_TIME_NONE;
248   parse->bytes_since_pcr = 0;
249   parse->pcr_pid = parse->user_pcr_pid;
250   parse->ts_offset = 0;
251 }
252
253 static void
254 mpegts_parse_set_property (GObject * object, guint prop_id,
255     const GValue * value, GParamSpec * pspec)
256 {
257   MpegTSParse2 *parse = (MpegTSParse2 *) object;
258
259   switch (prop_id) {
260     case PROP_SET_TIMESTAMPS:
261       parse->set_timestamps = g_value_get_boolean (value);
262       break;
263     case PROP_SMOOTHING_LATENCY:
264       parse->smoothing_latency = GST_USECOND * g_value_get_uint (value);
265       mpegts_packetizer_set_pcr_discont_threshold (GST_MPEGTS_BASE
266           (parse)->packetizer, parse->smoothing_latency);
267       break;
268     case PROP_PCR_PID:
269       parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value);
270       break;
271     default:
272       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
273   }
274 }
275
276 static void
277 mpegts_parse_get_property (GObject * object, guint prop_id,
278     GValue * value, GParamSpec * pspec)
279 {
280   MpegTSParse2 *parse = (MpegTSParse2 *) object;
281
282   switch (prop_id) {
283     case PROP_SET_TIMESTAMPS:
284       g_value_set_boolean (value, parse->set_timestamps);
285       break;
286     case PROP_SMOOTHING_LATENCY:
287       g_value_set_uint (value, parse->smoothing_latency / GST_USECOND);
288       break;
289     case PROP_PCR_PID:
290       g_value_set_int (value, parse->pcr_pid);
291       break;
292     default:
293       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
294   }
295 }
296
297 static gboolean
298 prepare_src_pad (MpegTSBase * base, MpegTSParse2 * parse)
299 {
300   GstEvent *event;
301   gchar *stream_id;
302   GstCaps *caps;
303
304   if (!parse->first)
305     return TRUE;
306
307   /* If there's no packet_size yet, we can't set caps yet */
308   if (G_UNLIKELY (base->packetizer->packet_size == 0))
309     return FALSE;
310
311   stream_id =
312       gst_pad_create_stream_id (parse->srcpad, GST_ELEMENT_CAST (base),
313       "multi-program");
314
315   event =
316       gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
317       0);
318   if (event) {
319     if (gst_event_parse_group_id (event, &parse->group_id))
320       parse->have_group_id = TRUE;
321     else
322       parse->have_group_id = FALSE;
323     gst_event_unref (event);
324   } else if (!parse->have_group_id) {
325     parse->have_group_id = TRUE;
326     parse->group_id = gst_util_group_id_next ();
327   }
328   event = gst_event_new_stream_start (stream_id);
329   if (parse->have_group_id)
330     gst_event_set_group_id (event, parse->group_id);
331
332   gst_pad_push_event (parse->srcpad, event);
333   g_free (stream_id);
334
335   caps = gst_caps_new_simple ("video/mpegts",
336       "systemstream", G_TYPE_BOOLEAN, TRUE,
337       "packetsize", G_TYPE_INT, base->packetizer->packet_size, NULL);
338
339   gst_pad_set_caps (parse->srcpad, caps);
340   gst_caps_unref (caps);
341
342   /* If setting output timestamps, ensure that the output segment is TIME */
343   if (parse->set_timestamps == FALSE || base->segment.format == GST_FORMAT_TIME)
344     /* Just use the upstream segment */
345     base->out_segment = base->segment;
346   else {
347     GstSegment *seg = &base->out_segment;
348     gst_segment_init (seg, GST_FORMAT_TIME);
349     GST_DEBUG_OBJECT (parse,
350         "Generating time output segment %" GST_SEGMENT_FORMAT, seg);
351   }
352   gst_pad_push_event (parse->srcpad,
353       gst_event_new_segment (&base->out_segment));
354
355   parse->first = FALSE;
356
357   return TRUE;
358 }
359
360 static gboolean
361 push_event (MpegTSBase * base, GstEvent * event)
362 {
363   MpegTSParse2 *parse = (MpegTSParse2 *) base;
364   GList *tmp;
365
366   if (G_UNLIKELY (parse->first)) {
367     /* We will send the segment when really starting  */
368     if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
369       gst_event_unref (event);
370       return TRUE;
371     }
372     prepare_src_pad (base, parse);
373   }
374   if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS))
375     drain_pending_buffers (parse, TRUE);
376
377   if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
378     parse->ts_offset = 0;
379
380   for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
381     GstPad *pad = (GstPad *) tmp->data;
382     if (pad) {
383       gst_event_ref (event);
384       gst_pad_push_event (pad, event);
385     }
386   }
387
388   gst_pad_push_event (parse->srcpad, event);
389
390   return TRUE;
391 }
392
393 static MpegTSParsePad *
394 mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name)
395 {
396   GstPad *pad;
397   MpegTSParsePad *tspad;
398
399   pad = gst_pad_new_from_static_template (&program_template, pad_name);
400   gst_pad_set_query_function (pad,
401       GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
402
403   /* create our wrapper */
404   tspad = g_new0 (MpegTSParsePad, 1);
405   tspad->pad = pad;
406   tspad->program_number = -1;
407   tspad->program = NULL;
408   tspad->pushed = FALSE;
409   tspad->flow_return = GST_FLOW_NOT_LINKED;
410   gst_pad_set_element_private (pad, tspad);
411   gst_flow_combiner_add_pad (parse->flowcombiner, pad);
412
413   return tspad;
414 }
415
416 static void
417 mpegts_parse_destroy_tspad (MpegTSParse2 * parse, MpegTSParsePad * tspad)
418 {
419   /* free the wrapper */
420   g_free (tspad);
421 }
422
423 static void
424 mpegts_parse_pad_removed (GstElement * element, GstPad * pad)
425 {
426   MpegTSParsePad *tspad;
427   MpegTSParse2 *parse = GST_MPEGTS_PARSE (element);
428
429   if (gst_pad_get_direction (pad) == GST_PAD_SINK)
430     return;
431
432   tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
433   if (tspad) {
434     mpegts_parse_destroy_tspad (parse, tspad);
435
436     parse->srcpads = g_list_remove_all (parse->srcpads, pad);
437   }
438
439   if (GST_ELEMENT_CLASS (parent_class)->pad_removed)
440     GST_ELEMENT_CLASS (parent_class)->pad_removed (element, pad);
441 }
442
443 static GstPad *
444 mpegts_parse_request_new_pad (GstElement * element, GstPadTemplate * template,
445     const gchar * padname, const GstCaps * caps)
446 {
447   MpegTSParse2 *parse;
448   MpegTSParsePad *tspad;
449   MpegTSParseProgram *parseprogram;
450   GstPad *pad;
451   gint program_num = -1;
452   GstEvent *event;
453   gchar *stream_id;
454
455   g_return_val_if_fail (template != NULL, NULL);
456   g_return_val_if_fail (GST_IS_MPEGTS_PARSE (element), NULL);
457   g_return_val_if_fail (padname != NULL, NULL);
458
459   sscanf (padname + 8, "%d", &program_num);
460
461   GST_DEBUG_OBJECT (element, "padname:%s, program:%d", padname, program_num);
462
463   parse = GST_MPEGTS_PARSE (element);
464
465   tspad = mpegts_parse_create_tspad (parse, padname);
466   tspad->program_number = program_num;
467
468   /* Find if the program is already active */
469   parseprogram =
470       (MpegTSParseProgram *) mpegts_base_get_program (GST_MPEGTS_BASE (parse),
471       program_num);
472   if (parseprogram) {
473     tspad->program = parseprogram;
474     parseprogram->tspad = tspad;
475   }
476
477   pad = tspad->pad;
478   parse->srcpads = g_list_append (parse->srcpads, pad);
479
480   gst_pad_set_active (pad, TRUE);
481
482   stream_id = gst_pad_create_stream_id (pad, element, padname + 8);
483
484   event =
485       gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
486       0);
487   if (event) {
488     if (gst_event_parse_group_id (event, &parse->group_id))
489       parse->have_group_id = TRUE;
490     else
491       parse->have_group_id = FALSE;
492     gst_event_unref (event);
493   } else if (!parse->have_group_id) {
494     parse->have_group_id = TRUE;
495     parse->group_id = gst_util_group_id_next ();
496   }
497   event = gst_event_new_stream_start (stream_id);
498   if (parse->have_group_id)
499     gst_event_set_group_id (event, parse->group_id);
500
501   gst_pad_push_event (pad, event);
502   g_free (stream_id);
503
504   gst_element_add_pad (element, pad);
505
506   return pad;
507 }
508
509 static GstBuffer *
510 mpegts_packet_to_buffer (MpegTSPacketizerPacket * packet)
511 {
512   GstBuffer *buf =
513       gst_buffer_new_and_alloc (packet->data_end - packet->data_start);
514   gst_buffer_fill (buf, 0, packet->data_start,
515       packet->data_end - packet->data_start);
516   return buf;
517 }
518
519 static void
520 mpegts_parse_release_pad (GstElement * element, GstPad * pad)
521 {
522   MpegTSParse2 *parse = (MpegTSParse2 *) element;
523
524   gst_pad_set_active (pad, FALSE);
525   /* we do the cleanup in GstElement::pad-removed */
526   gst_flow_combiner_remove_pad (parse->flowcombiner, pad);
527   gst_element_remove_pad (element, pad);
528 }
529
530 static GstFlowReturn
531 mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad,
532     GstMpegtsSection * section, MpegTSPacketizerPacket * packet)
533 {
534   GstFlowReturn ret = GST_FLOW_OK;
535   gboolean to_push = TRUE;
536
537   if (tspad->program_number != -1) {
538     if (tspad->program) {
539       /* we push all sections to all pads except PMTs which we
540        * only push to pads meant to receive that program number */
541       if (section->table_id == 0x02) {
542         /* PMT */
543         if (section->subtable_extension != tspad->program_number)
544           to_push = FALSE;
545       }
546     } else if (section->table_id != 0x00) {
547       /* there's a program filter on the pad but the PMT for the program has not
548        * been parsed yet, ignore the pad until we get a PMT.
549        * But we always allow PAT to go through */
550       to_push = FALSE;
551     }
552   }
553
554   GST_DEBUG_OBJECT (parse,
555       "pushing section: %d program number: %d table_id: %d", to_push,
556       tspad->program_number, section->table_id);
557
558   if (to_push) {
559     GstBuffer *buf = mpegts_packet_to_buffer (packet);
560     ret = gst_pad_push (tspad->pad, buf);
561     ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
562   }
563
564   GST_LOG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
565   return ret;
566 }
567
568 static GstFlowReturn
569 mpegts_parse_tspad_push (MpegTSParse2 * parse, MpegTSParsePad * tspad,
570     MpegTSPacketizerPacket * packet)
571 {
572   GstFlowReturn ret = GST_FLOW_OK;
573   MpegTSBaseProgram *bp = NULL;
574
575   if (tspad->program_number != -1) {
576     if (tspad->program)
577       bp = (MpegTSBaseProgram *) tspad->program;
578     else
579       bp = mpegts_base_get_program ((MpegTSBase *) parse,
580           tspad->program_number);
581   }
582
583   if (bp) {
584     if (packet->pid == bp->pmt_pid || bp->streams == NULL
585         || bp->streams[packet->pid]) {
586       GstBuffer *buf = mpegts_packet_to_buffer (packet);
587       /* push if there's no filter or if the pid is in the filter */
588       ret = gst_pad_push (tspad->pad, buf);
589       ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
590     }
591   }
592   GST_DEBUG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
593
594   return ret;
595 }
596
597 static void
598 pad_clear_for_push (GstPad * pad, MpegTSParse2 * parse)
599 {
600   MpegTSParsePad *tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
601
602   tspad->flow_return = GST_FLOW_NOT_LINKED;
603   tspad->pushed = FALSE;
604 }
605
606 static GstFlowReturn
607 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
608     GstMpegtsSection * section)
609 {
610   MpegTSParse2 *parse = (MpegTSParse2 *) base;
611   guint32 pads_cookie;
612   gboolean done = FALSE;
613   GstPad *pad = NULL;
614   MpegTSParsePad *tspad;
615   GstFlowReturn ret;
616   GList *srcpads;
617   GstBuffer *buf;
618
619   GST_OBJECT_LOCK (parse);
620   srcpads = parse->srcpads;
621
622   /* clear tspad->pushed on pads */
623   g_list_foreach (srcpads, (GFunc) pad_clear_for_push, parse);
624   if (srcpads)
625     ret = GST_FLOW_NOT_LINKED;
626   else
627     ret = GST_FLOW_OK;
628
629   /* Get cookie and source pads list */
630   pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
631   if (G_LIKELY (srcpads)) {
632     pad = GST_PAD_CAST (srcpads->data);
633     g_object_ref (pad);
634   }
635   GST_OBJECT_UNLOCK (parse);
636
637   buf = mpegts_packet_to_buffer (packet);
638   if (!(packet->afc_flags & MPEGTS_AFC_RANDOM_ACCESS_FLAG)) {
639     gst_buffer_set_flags (buf, GST_BUFFER_FLAG_DELTA_UNIT);
640   }
641   ret = mpegts_parse_have_buffer (base, buf);
642
643   while (pad && !done) {
644     tspad = gst_pad_get_element_private (pad);
645
646     if (G_LIKELY (!tspad->pushed)) {
647       if (section) {
648         tspad->flow_return =
649             mpegts_parse_tspad_push_section (parse, tspad, section, packet);
650       } else {
651         tspad->flow_return = mpegts_parse_tspad_push (parse, tspad, packet);
652       }
653       tspad->pushed = TRUE;
654
655       if (G_UNLIKELY (tspad->flow_return != GST_FLOW_OK
656               && tspad->flow_return != GST_FLOW_NOT_LINKED)) {
657         /* return the error upstream */
658         ret = tspad->flow_return;
659         done = TRUE;
660       }
661
662     }
663
664     if (ret == GST_FLOW_NOT_LINKED)
665       ret = tspad->flow_return;
666
667     g_object_unref (pad);
668
669     if (G_UNLIKELY (!done)) {
670       GST_OBJECT_LOCK (parse);
671       if (G_UNLIKELY (pads_cookie != GST_ELEMENT_CAST (parse)->pads_cookie)) {
672         /* resync */
673         GST_DEBUG ("resync");
674         pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
675         srcpads = parse->srcpads;
676       } else {
677         GST_DEBUG ("getting next pad");
678         /* Get next pad */
679         srcpads = g_list_next (srcpads);
680       }
681
682       if (srcpads) {
683         pad = GST_PAD_CAST (srcpads->data);
684         g_object_ref (pad);
685       } else
686         done = TRUE;
687       GST_OBJECT_UNLOCK (parse);
688     }
689   }
690
691   return ret;
692 }
693
694 static void
695 mpegts_parse_inspect_packet (MpegTSBase * base, MpegTSPacketizerPacket * packet)
696 {
697   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
698   GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p PCR %"
699       G_GUINT64_FORMAT, packet->pid, packet->payload_unit_start_indicator,
700       packet->scram_afc_cc & 0x30,
701       FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload,
702       packet->pcr);
703
704   /* Store the PCR if desired */
705   if (parse->current_pcr == GST_CLOCK_TIME_NONE &&
706       packet->afc_flags & MPEGTS_AFC_PCR_FLAG) {
707     /* Take this as the pcr_pid if set to auto-select */
708     if (parse->pcr_pid == -1)
709       parse->pcr_pid = packet->pid;
710     /* Check the PCR-PID matches the program we want for multiple programs */
711     if (parse->pcr_pid == packet->pid) {
712       parse->current_pcr = mpegts_packetizer_pts_to_ts (base->packetizer,
713           PCRTIME_TO_GSTTIME (packet->pcr), parse->pcr_pid);
714       GST_DEBUG ("Got new PCR %" GST_TIME_FORMAT " raw %" G_GUINT64_FORMAT,
715           GST_TIME_ARGS (parse->current_pcr), packet->pcr);
716       if (parse->base_pcr == GST_CLOCK_TIME_NONE) {
717         parse->base_pcr = parse->current_pcr;
718       }
719     }
720   }
721 }
722
723 static GstClockTime
724 get_pending_timestamp_diff (MpegTSParse2 * parse)
725 {
726   GList *l;
727   GstClockTime first_ts, last_ts;
728
729   if (parse->pending_buffers == NULL)
730     return GST_CLOCK_TIME_NONE;
731
732   l = g_list_last (parse->pending_buffers);
733   first_ts = GST_BUFFER_PTS (l->data);
734   if (first_ts == GST_CLOCK_TIME_NONE)
735     return GST_CLOCK_TIME_NONE;
736
737   l = g_list_first (parse->pending_buffers);
738   last_ts = GST_BUFFER_PTS (l->data);
739   if (last_ts == GST_CLOCK_TIME_NONE)
740     return GST_CLOCK_TIME_NONE;
741
742   return last_ts - first_ts;
743 }
744
745 static GstFlowReturn
746 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
747 {
748   GstFlowReturn ret = GST_FLOW_OK;
749   GstClockTime start_ts;
750   GstClockTime pcr = GST_CLOCK_TIME_NONE;
751   GstClockTime pcr_diff = 0;
752   gsize pcr_bytes, bytes_since_pcr, pos;
753   GstBuffer *buffer;
754   GList *l, *end = NULL;
755
756   if (parse->pending_buffers == NULL)
757     return GST_FLOW_OK;         /* Nothing to push */
758
759   /*
760    * There are 4 cases:
761    *  1 We get a buffer with no PCR -> it's the head of the list
762    *      -> Do nothing, unless it's EOS
763    *  2 We get a buffer with a PCR, it's the first PCR we've seen, and belongs
764    *    to the buffer at the head of the list
765    *    -> Push any buffers in the list except the head,
766    *       using a smoothing of their timestamps to land at the PCR
767    *    -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
768    *  3 It's EOS (drain_all == TRUE, current_pcr == NONE)
769    *    -> Push any buffers in the list using a smoothing of their timestamps
770    *       starting at the previous PCR or first TS
771    *  4 We get a buffer with a PCR, and have a previous PCR
772    *    -> If distance > smoothing_latency,
773    *       output buffers except the last in the pending queue using
774    *       piecewise-linear timestamps
775    *    -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
776    */
777
778   /* Case 1 */
779   if (!GST_CLOCK_TIME_IS_VALID (parse->current_pcr) && !drain_all)
780     return GST_FLOW_OK;
781
782   if (GST_CLOCK_TIME_IS_VALID (parse->current_pcr)) {
783     pcr = parse->current_pcr;
784     parse->current_pcr = GST_CLOCK_TIME_NONE;
785   }
786
787   /* The bytes of the last buffer are after the PCR */
788   buffer = GST_BUFFER (g_list_nth_data (parse->pending_buffers, 0));
789   bytes_since_pcr = gst_buffer_get_size (buffer);
790
791   pcr_bytes = parse->bytes_since_pcr - bytes_since_pcr;
792
793   if (!drain_all)
794     end = g_list_first (parse->pending_buffers);
795
796   /* Case 2 */
797   if (!GST_CLOCK_TIME_IS_VALID (parse->previous_pcr)) {
798     pcr_diff = get_pending_timestamp_diff (parse);
799
800     /* Calculate the start_ts that ends at the end timestamp */
801     start_ts = GST_CLOCK_TIME_NONE;
802     if (end) {
803       start_ts = GST_BUFFER_PTS (GST_BUFFER (end->data));
804       if (start_ts > pcr_diff)
805         start_ts -= pcr_diff;
806     }
807   } else if (drain_all) {       /* Case 3 */
808     start_ts = parse->previous_pcr;
809     pcr_diff = get_pending_timestamp_diff (parse);
810   } else {                      /* Case 4 */
811     start_ts = parse->previous_pcr;
812     if (GST_CLOCK_TIME_IS_VALID (pcr) && pcr > start_ts)
813       pcr_diff = GST_CLOCK_DIFF (start_ts, pcr);
814
815     /* Make sure PCR observations are sufficiently far apart */
816     if (drain_all == FALSE && pcr_diff < parse->smoothing_latency)
817       return GST_FLOW_OK;
818   }
819
820   GST_INFO_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT
821       " duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes",
822       GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes);
823
824   /* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */
825   pos = 0;
826   l = g_list_last (parse->pending_buffers);
827   while (l != end) {
828     GList *p;
829     GstClockTime out_ts = start_ts;
830
831     buffer = gst_buffer_make_writable (GST_BUFFER (l->data));
832
833     if (out_ts != GST_CLOCK_TIME_NONE && pcr_diff != GST_CLOCK_TIME_NONE &&
834         pcr_bytes && pos)
835       out_ts += gst_util_uint64_scale (pcr_diff, pos, pcr_bytes);
836
837     pos += gst_buffer_get_size (buffer);
838
839     GST_DEBUG_OBJECT (parse,
840         "InputTS %" GST_TIME_FORMAT " out %" GST_TIME_FORMAT,
841         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (out_ts));
842
843     GST_BUFFER_PTS (buffer) = out_ts + parse->ts_offset;
844     GST_BUFFER_DTS (buffer) = out_ts + parse->ts_offset;
845     if (ret == GST_FLOW_OK) {
846       ret = gst_pad_push (parse->srcpad, buffer);
847       ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
848     } else
849       gst_buffer_unref (buffer);
850
851     /* Free this list node and move to the next */
852     p = g_list_previous (l);
853     parse->pending_buffers = g_list_delete_link (parse->pending_buffers, l);
854     l = p;
855   }
856
857   parse->pending_buffers = end;
858   parse->bytes_since_pcr = bytes_since_pcr;
859   parse->previous_pcr = pcr;
860   return ret;
861 }
862
863 static GstFlowReturn
864 mpegts_parse_have_buffer (MpegTSBase * base, GstBuffer * buffer)
865 {
866   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
867   GstFlowReturn ret = GST_FLOW_OK;
868
869   GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer);
870
871   if (parse->current_pcr != GST_CLOCK_TIME_NONE) {
872     GST_DEBUG_OBJECT (parse,
873         "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT,
874         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
875         GST_TIME_ARGS (parse->current_pcr));
876   }
877
878   if (parse->set_timestamps || parse->first) {
879     parse->pending_buffers = g_list_prepend (parse->pending_buffers, buffer);
880     parse->bytes_since_pcr += gst_buffer_get_size (buffer);
881     buffer = NULL;
882   }
883
884   if (!prepare_src_pad (base, parse))
885     return GST_FLOW_OK;
886
887   if (parse->pending_buffers != NULL) {
888     /* Don't keep pending_buffers if not setting output timestamps */
889     gboolean drain_all = (parse->set_timestamps == FALSE);
890     ret = drain_pending_buffers (parse, drain_all);
891     if (ret != GST_FLOW_OK) {
892       if (buffer)
893         gst_buffer_unref (buffer);
894       return ret;
895     }
896   }
897
898   if (buffer != NULL) {
899     ret = gst_pad_push (parse->srcpad, buffer);
900     ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
901   }
902
903   return ret;
904 }
905
906 static MpegTSParsePad *
907 find_pad_for_program (MpegTSParse2 * parse, guint program_number)
908 {
909   GList *tmp;
910
911   for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
912     MpegTSParsePad *tspad = gst_pad_get_element_private ((GstPad *) tmp->data);
913
914     if (tspad->program_number == program_number)
915       return tspad;
916   }
917
918   return NULL;
919 }
920
921 static void
922 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
923 {
924   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
925   MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
926   MpegTSParsePad *tspad;
927
928   /* If we have a request pad for that program, activate it */
929   tspad = find_pad_for_program (parse, program->program_number);
930
931   if (tspad) {
932     tspad->program = parseprogram;
933     parseprogram->tspad = tspad;
934   }
935 }
936
937 static void
938 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
939 {
940   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
941   MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
942   MpegTSParsePad *tspad;
943
944   /* If we have a request pad for that program, activate it */
945   tspad = find_pad_for_program (parse, program->program_number);
946
947   if (tspad) {
948     tspad->program = NULL;
949     parseprogram->tspad = NULL;
950   }
951
952   parse->pcr_pid = -1;
953   parse->ts_offset += parse->current_pcr - parse->base_pcr;
954   parse->base_pcr = GST_CLOCK_TIME_NONE;
955 }
956
957 static gboolean
958 mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
959 {
960   MpegTSParse2 *parse = GST_MPEGTS_PARSE (parent);
961   gboolean res;
962
963   switch (GST_QUERY_TYPE (query)) {
964     case GST_QUERY_LATENCY:
965     {
966       if ((res = gst_pad_peer_query (((MpegTSBase *) parse)->sinkpad, query))) {
967         gboolean is_live;
968         GstClockTime min_latency, max_latency;
969
970         gst_query_parse_latency (query, &is_live, &min_latency, &max_latency);
971         if (is_live) {
972           GstClockTime extra_latency = TS_LATENCY * GST_MSECOND;
973           if (parse->set_timestamps) {
974             extra_latency = MAX (extra_latency, parse->smoothing_latency);
975           }
976           min_latency += extra_latency;
977           if (max_latency != GST_CLOCK_TIME_NONE)
978             max_latency += extra_latency;
979         }
980
981         gst_query_set_latency (query, is_live, min_latency, max_latency);
982       }
983       break;
984     }
985     default:
986       res = gst_pad_query_default (pad, parent, query);
987   }
988   return res;
989 }
990
991 gboolean
992 gst_mpegtsparse_plugin_init (GstPlugin * plugin)
993 {
994   GST_DEBUG_CATEGORY_INIT (mpegts_parse_debug, "tsparse", 0,
995       "MPEG transport stream parser");
996
997   return gst_element_register (plugin, "tsparse",
998       GST_RANK_NONE, GST_TYPE_MPEGTS_PARSE);
999 }