mpegtsparse: Moved dispose function into finalize
[platform/upstream/gstreamer.git] / gst / mpegtsdemux / mpegtsparse.c
1 /*
2  * mpegtsparse.c - 
3  * Copyright (C) 2007 Alessandro Decina
4  * 
5  * Authors:
6  *   Alessandro Decina <alessandro@nnva.org>
7  *   Zaheer Abbas Merali <zaheerabbas at merali dot org>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32
33 #include "mpegtsbase.h"
34 #include "mpegtsparse.h"
35 #include "gstmpegdesc.h"
36
37 /* latency in mseconds is maximum 100 ms between PCR */
38 #define TS_LATENCY 100
39
40 #define TABLE_ID_UNSET 0xFF
41 #define RUNNING_STATUS_RUNNING 4
42 #define SYNC_BYTE 0x47
43
44 GST_DEBUG_CATEGORY_STATIC (mpegts_parse_debug);
45 #define GST_CAT_DEFAULT mpegts_parse_debug
46
47 typedef struct _MpegTSParsePad MpegTSParsePad;
48
49 typedef struct
50 {
51   MpegTSBaseProgram program;
52   MpegTSParsePad *tspad;
53 } MpegTSParseProgram;
54
55 struct _MpegTSParsePad
56 {
57   GstPad *pad;
58
59   /* the program number that the peer wants on this pad */
60   gint program_number;
61   MpegTSParseProgram *program;
62
63   /* set to FALSE before a push and TRUE after */
64   gboolean pushed;
65
66   /* the return of the latest push */
67   GstFlowReturn flow_return;
68
69   MpegTSParse2Adapter ts_adapter;
70 };
71
72 static GstStaticPadTemplate src_template =
73 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
74     GST_PAD_ALWAYS,
75     GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
76     );
77
78 static GstStaticPadTemplate program_template =
79 GST_STATIC_PAD_TEMPLATE ("program_%u", GST_PAD_SRC,
80     GST_PAD_REQUEST,
81     GST_STATIC_CAPS ("video/mpegts, " "systemstream = (boolean) true ")
82     );
83
84 enum
85 {
86   PROP_0,
87   PROP_SET_TIMESTAMPS,
88   PROP_SMOOTHING_LATENCY,
89   PROP_PCR_PID,
90   PROP_ALIGNMENT,
91   /* FILL ME */
92 };
93
94 static void mpegts_parse_set_property (GObject * object, guint prop_id,
95     const GValue * value, GParamSpec * pspec);
96 static void mpegts_parse_get_property (GObject * object, guint prop_id,
97     GValue * value, GParamSpec * pspec);
98
99 static void
100 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program);
101 static void
102 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program);
103
104 static GstFlowReturn
105 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
106     GstMpegtsSection * section);
107 static void mpegts_parse_inspect_packet (MpegTSBase * base,
108     MpegTSPacketizerPacket * packet);
109 static GstFlowReturn mpegts_parse_have_buffer (MpegTSBase * base,
110     GstBuffer * buffer);
111
112 static MpegTSParsePad *mpegts_parse_create_tspad (MpegTSParse2 * parse,
113     const gchar * name);
114 static void mpegts_parse_destroy_tspad (MpegTSParse2 * parse,
115     MpegTSParsePad * tspad);
116
117 static void mpegts_parse_pad_removed (GstElement * element, GstPad * pad);
118 static GstPad *mpegts_parse_request_new_pad (GstElement * element,
119     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
120 static void mpegts_parse_release_pad (GstElement * element, GstPad * pad);
121 static gboolean mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent,
122     GstQuery * query);
123 static gboolean push_event (MpegTSBase * base, GstEvent * event);
124
125 #define mpegts_parse_parent_class parent_class
126 G_DEFINE_TYPE (MpegTSParse2, mpegts_parse, GST_TYPE_MPEGTS_BASE);
127 static void mpegts_parse_reset (MpegTSBase * base);
128 static GstFlowReturn mpegts_parse_input_done (MpegTSBase * base,
129     GstBuffer * buffer);
130 static GstFlowReturn
131 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all);
132
133 static void
134 mpegts_parse_finalize (GObject * object)
135 {
136   MpegTSParse2 *parse = (MpegTSParse2 *) object;
137
138   gst_flow_combiner_free (parse->flowcombiner);
139
140   gst_adapter_clear (parse->ts_adapter.adapter);
141   g_object_unref (parse->ts_adapter.adapter);
142
143   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
144 }
145
146 static void
147 mpegts_parse_class_init (MpegTSParse2Class * klass)
148 {
149   GObjectClass *gobject_class = (GObjectClass *) (klass);
150   GstElementClass *element_class;
151   MpegTSBaseClass *ts_class;
152
153   gobject_class->set_property = mpegts_parse_set_property;
154   gobject_class->get_property = mpegts_parse_get_property;
155   gobject_class->finalize = mpegts_parse_finalize;
156
157   g_object_class_install_property (gobject_class, PROP_SET_TIMESTAMPS,
158       g_param_spec_boolean ("set-timestamps",
159           "Timestamp (or re-timestamp) the output stream",
160           "If set, timestamps will be set on the output buffers using "
161           "PCRs and smoothed over the smoothing-latency period", FALSE,
162           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
163   g_object_class_install_property (gobject_class, PROP_SMOOTHING_LATENCY,
164       g_param_spec_uint ("smoothing-latency", "Smoothing Latency",
165           "Additional latency in microseconds for smoothing jitter in input timestamps on live capture",
166           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167   g_object_class_install_property (gobject_class, PROP_PCR_PID,
168       g_param_spec_int ("pcr-pid", "PID containing PCR",
169           "Set the PID to use for PCR values (-1 for auto)",
170           -1, G_MAXINT, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171   g_object_class_install_property (gobject_class, PROP_ALIGNMENT,
172       g_param_spec_uint ("alignment", "Alignment",
173           "Number of packets per buffer (padded with dummy packets on EOS) (0 = auto)",
174           0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
175
176   element_class = GST_ELEMENT_CLASS (klass);
177   element_class->pad_removed = mpegts_parse_pad_removed;
178   element_class->request_new_pad = mpegts_parse_request_new_pad;
179   element_class->release_pad = mpegts_parse_release_pad;
180
181   gst_element_class_add_static_pad_template (element_class, &src_template);
182   gst_element_class_add_static_pad_template (element_class, &program_template);
183
184   gst_element_class_set_static_metadata (element_class,
185       "MPEG transport stream parser", "Codec/Parser",
186       "Parses MPEG2 transport streams",
187       "Alessandro Decina <alessandro@nnva.org>, "
188       "Zaheer Abbas Merali <zaheerabbas at merali dot org>");
189
190   ts_class = GST_MPEGTS_BASE_CLASS (klass);
191   ts_class->push = GST_DEBUG_FUNCPTR (mpegts_parse_push);
192   ts_class->push_event = GST_DEBUG_FUNCPTR (push_event);
193   ts_class->program_started = GST_DEBUG_FUNCPTR (mpegts_parse_program_started);
194   ts_class->program_stopped = GST_DEBUG_FUNCPTR (mpegts_parse_program_stopped);
195   ts_class->reset = GST_DEBUG_FUNCPTR (mpegts_parse_reset);
196   ts_class->input_done = GST_DEBUG_FUNCPTR (mpegts_parse_input_done);
197   ts_class->inspect_packet = GST_DEBUG_FUNCPTR (mpegts_parse_inspect_packet);
198 }
199
200 static void
201 mpegts_parse_init (MpegTSParse2 * parse)
202 {
203   MpegTSBase *base = (MpegTSBase *) parse;
204
205   base->program_size = sizeof (MpegTSParseProgram);
206   base->push_data = TRUE;
207   base->push_section = TRUE;
208
209   parse->user_pcr_pid = parse->pcr_pid = -1;
210
211   parse->flowcombiner = gst_flow_combiner_new ();
212
213   parse->srcpad = gst_pad_new_from_static_template (&src_template, "src");
214   gst_flow_combiner_add_pad (parse->flowcombiner, parse->srcpad);
215   parse->first = TRUE;
216   gst_pad_set_query_function (parse->srcpad,
217       GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
218   gst_element_add_pad (GST_ELEMENT (parse), parse->srcpad);
219
220   parse->have_group_id = FALSE;
221   parse->group_id = G_MAXUINT;
222
223   parse->ts_adapter.adapter = gst_adapter_new ();
224   parse->ts_adapter.packets_in_adapter = 0;
225   parse->alignment = 0;
226   parse->is_eos = FALSE;
227   parse->header = 0;
228 }
229
230 static void
231 mpegts_parse_reset (MpegTSBase * base)
232 {
233   MpegTSParse2 *parse = (MpegTSParse2 *) base;
234
235   /* Set the various know PIDs we are interested in */
236
237   /* CAT */
238   MPEGTS_BIT_SET (base->known_psi, 1);
239   /* NIT, ST */
240   MPEGTS_BIT_SET (base->known_psi, 0x10);
241   /* SDT, BAT, ST */
242   MPEGTS_BIT_SET (base->known_psi, 0x11);
243   /* EIT, ST, CIT (TS 102 323) */
244   MPEGTS_BIT_SET (base->known_psi, 0x12);
245   /* RST, ST */
246   MPEGTS_BIT_SET (base->known_psi, 0x13);
247   /* RNT (TS 102 323) */
248   MPEGTS_BIT_SET (base->known_psi, 0x16);
249   /* inband signalling */
250   MPEGTS_BIT_SET (base->known_psi, 0x1c);
251   /* measurement */
252   MPEGTS_BIT_SET (base->known_psi, 0x1d);
253   /* DIT */
254   MPEGTS_BIT_SET (base->known_psi, 0x1e);
255   /* SIT */
256   MPEGTS_BIT_SET (base->known_psi, 0x1f);
257
258   parse->first = TRUE;
259   parse->have_group_id = FALSE;
260   parse->group_id = G_MAXUINT;
261
262   g_list_free_full (parse->pending_buffers, (GDestroyNotify) gst_buffer_unref);
263   parse->pending_buffers = NULL;
264
265   parse->current_pcr = GST_CLOCK_TIME_NONE;
266   parse->previous_pcr = GST_CLOCK_TIME_NONE;
267   parse->base_pcr = GST_CLOCK_TIME_NONE;
268   parse->bytes_since_pcr = 0;
269   parse->pcr_pid = parse->user_pcr_pid;
270   parse->ts_offset = 0;
271
272   gst_adapter_clear (parse->ts_adapter.adapter);
273   parse->ts_adapter.packets_in_adapter = 0;
274   parse->is_eos = FALSE;
275   parse->header = 0;
276 }
277
278 static void
279 mpegts_parse_set_property (GObject * object, guint prop_id,
280     const GValue * value, GParamSpec * pspec)
281 {
282   MpegTSParse2 *parse = (MpegTSParse2 *) object;
283
284   switch (prop_id) {
285     case PROP_SET_TIMESTAMPS:
286       parse->set_timestamps = g_value_get_boolean (value);
287       break;
288     case PROP_SMOOTHING_LATENCY:
289       parse->smoothing_latency = GST_USECOND * g_value_get_uint (value);
290       mpegts_packetizer_set_pcr_discont_threshold (GST_MPEGTS_BASE
291           (parse)->packetizer, parse->smoothing_latency);
292       break;
293     case PROP_PCR_PID:
294       parse->pcr_pid = parse->user_pcr_pid = g_value_get_int (value);
295       break;
296     case PROP_ALIGNMENT:
297       parse->alignment = g_value_get_uint (value);
298       break;
299     default:
300       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
301   }
302 }
303
304 static void
305 mpegts_parse_get_property (GObject * object, guint prop_id,
306     GValue * value, GParamSpec * pspec)
307 {
308   MpegTSParse2 *parse = (MpegTSParse2 *) object;
309
310   switch (prop_id) {
311     case PROP_SET_TIMESTAMPS:
312       g_value_set_boolean (value, parse->set_timestamps);
313       break;
314     case PROP_SMOOTHING_LATENCY:
315       g_value_set_uint (value, parse->smoothing_latency / GST_USECOND);
316       break;
317     case PROP_PCR_PID:
318       g_value_set_int (value, parse->pcr_pid);
319       break;
320     case PROP_ALIGNMENT:
321       g_value_set_uint (value, parse->alignment);
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
325   }
326 }
327
328 static gboolean
329 prepare_src_pad (MpegTSBase * base, MpegTSParse2 * parse)
330 {
331   GstEvent *event;
332   gchar *stream_id;
333   GstCaps *caps;
334
335   if (!parse->first)
336     return TRUE;
337
338   /* If there's no packet_size yet, we can't set caps yet */
339   if (G_UNLIKELY (base->packetizer->packet_size == 0))
340     return FALSE;
341
342   stream_id =
343       gst_pad_create_stream_id (parse->srcpad, GST_ELEMENT_CAST (base),
344       "multi-program");
345
346   event =
347       gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
348       0);
349   if (event) {
350     if (gst_event_parse_group_id (event, &parse->group_id))
351       parse->have_group_id = TRUE;
352     else
353       parse->have_group_id = FALSE;
354     gst_event_unref (event);
355   } else if (!parse->have_group_id) {
356     parse->have_group_id = TRUE;
357     parse->group_id = gst_util_group_id_next ();
358   }
359   event = gst_event_new_stream_start (stream_id);
360   if (parse->have_group_id)
361     gst_event_set_group_id (event, parse->group_id);
362
363   gst_pad_push_event (parse->srcpad, event);
364   g_free (stream_id);
365
366   caps = gst_caps_new_simple ("video/mpegts",
367       "systemstream", G_TYPE_BOOLEAN, TRUE,
368       "packetsize", G_TYPE_INT, base->packetizer->packet_size, NULL);
369
370   gst_pad_set_caps (parse->srcpad, caps);
371   gst_caps_unref (caps);
372
373   /* If setting output timestamps, ensure that the output segment is TIME */
374   if (parse->set_timestamps == FALSE || base->segment.format == GST_FORMAT_TIME)
375     /* Just use the upstream segment */
376     base->out_segment = base->segment;
377   else {
378     GstSegment *seg = &base->out_segment;
379     gst_segment_init (seg, GST_FORMAT_TIME);
380     GST_DEBUG_OBJECT (parse,
381         "Generating time output segment %" GST_SEGMENT_FORMAT, seg);
382   }
383   gst_pad_push_event (parse->srcpad,
384       gst_event_new_segment (&base->out_segment));
385
386   parse->first = FALSE;
387
388   return TRUE;
389 }
390
391 static gboolean
392 push_event (MpegTSBase * base, GstEvent * event)
393 {
394   MpegTSParse2 *parse = (MpegTSParse2 *) base;
395   GList *tmp;
396
397   if (G_UNLIKELY (parse->first)) {
398     /* We will send the segment when really starting  */
399     if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT)) {
400       gst_event_unref (event);
401       return TRUE;
402     }
403     prepare_src_pad (base, parse);
404   }
405   if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_EOS)) {
406     parse->is_eos = TRUE;
407
408     if (parse->alignment > 0 && parse->ts_adapter.packets_in_adapter > 0
409         && parse->ts_adapter.packets_in_adapter < parse->alignment) {
410       GstBuffer *buf;
411       GstMapInfo map;
412       guint8 *data;
413       gint missing_packets =
414           parse->alignment - parse->ts_adapter.packets_in_adapter;
415       gint i = missing_packets;
416       gsize packet_size = base->packetizer->packet_size;
417
418       GST_DEBUG_OBJECT (parse, "Adding %d dummy packets", missing_packets);
419
420       buf = gst_buffer_new_and_alloc (missing_packets * packet_size);
421       gst_buffer_map (buf, &map, GST_MAP_READWRITE);
422       data = map.data;
423
424       g_assert (packet_size > 0);
425
426       for (; i > 0; i--) {
427         gint offset;
428
429         if (packet_size > MPEGTS_NORMAL_PACKETSIZE) {
430           parse->header++;
431           GST_WRITE_UINT32_BE (data, parse->header);
432           offset = 4;
433         } else {
434           offset = 0;
435         }
436         GST_WRITE_UINT8 (data + offset, SYNC_BYTE);
437         /* null packet PID */
438         GST_WRITE_UINT16_BE (data + offset + 1, 0x1FFF);
439         /* no adaptation field exists | continuity counter undefined */
440         GST_WRITE_UINT8 (data + offset + 3, 0x10);
441         /* payload */
442         memset (data + offset + 4, 0, MPEGTS_NORMAL_PACKETSIZE - 4);
443         data += packet_size;
444       }
445       gst_buffer_unmap (buf, &map);
446       gst_adapter_push (parse->ts_adapter.adapter, buf);
447       parse->ts_adapter.packets_in_adapter += missing_packets;
448     }
449     drain_pending_buffers (parse, TRUE);
450   }
451
452   if (G_UNLIKELY (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
453     parse->ts_offset = 0;
454
455   for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
456     GstPad *pad = (GstPad *) tmp->data;
457     if (pad) {
458       gst_event_ref (event);
459       gst_pad_push_event (pad, event);
460     }
461   }
462
463   gst_pad_push_event (parse->srcpad, event);
464
465   return TRUE;
466 }
467
468 static MpegTSParsePad *
469 mpegts_parse_create_tspad (MpegTSParse2 * parse, const gchar * pad_name)
470 {
471   GstPad *pad;
472   MpegTSParsePad *tspad;
473
474   pad = gst_pad_new_from_static_template (&program_template, pad_name);
475   gst_pad_set_query_function (pad,
476       GST_DEBUG_FUNCPTR (mpegts_parse_src_pad_query));
477
478   /* create our wrapper */
479   tspad = g_new0 (MpegTSParsePad, 1);
480   tspad->pad = pad;
481   tspad->program_number = -1;
482   tspad->program = NULL;
483   tspad->pushed = FALSE;
484   tspad->flow_return = GST_FLOW_NOT_LINKED;
485   tspad->ts_adapter.adapter = gst_adapter_new ();
486   tspad->ts_adapter.packets_in_adapter = 0;
487   gst_pad_set_element_private (pad, tspad);
488   gst_flow_combiner_add_pad (parse->flowcombiner, pad);
489
490   return tspad;
491 }
492
493 static void
494 mpegts_parse_destroy_tspad (MpegTSParse2 * parse, MpegTSParsePad * tspad)
495 {
496   gst_adapter_clear (tspad->ts_adapter.adapter);
497   g_object_unref (tspad->ts_adapter.adapter);
498
499   /* free the wrapper */
500   g_free (tspad);
501 }
502
503 static void
504 mpegts_parse_pad_removed (GstElement * element, GstPad * pad)
505 {
506   MpegTSParsePad *tspad;
507   MpegTSParse2 *parse = GST_MPEGTS_PARSE (element);
508
509   if (gst_pad_get_direction (pad) == GST_PAD_SINK)
510     return;
511
512   tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
513   if (tspad) {
514     mpegts_parse_destroy_tspad (parse, tspad);
515
516     parse->srcpads = g_list_remove_all (parse->srcpads, pad);
517   }
518
519   if (GST_ELEMENT_CLASS (parent_class)->pad_removed)
520     GST_ELEMENT_CLASS (parent_class)->pad_removed (element, pad);
521 }
522
523 static GstPad *
524 mpegts_parse_request_new_pad (GstElement * element, GstPadTemplate * template,
525     const gchar * padname, const GstCaps * caps)
526 {
527   MpegTSParse2 *parse;
528   MpegTSParsePad *tspad;
529   MpegTSParseProgram *parseprogram;
530   GstPad *pad;
531   gint program_num = -1;
532   GstEvent *event;
533   gchar *stream_id;
534
535   g_return_val_if_fail (template != NULL, NULL);
536   g_return_val_if_fail (GST_IS_MPEGTS_PARSE (element), NULL);
537   g_return_val_if_fail (padname != NULL, NULL);
538
539   sscanf (padname + 8, "%d", &program_num);
540
541   GST_DEBUG_OBJECT (element, "padname:%s, program:%d", padname, program_num);
542
543   parse = GST_MPEGTS_PARSE (element);
544
545   tspad = mpegts_parse_create_tspad (parse, padname);
546   tspad->program_number = program_num;
547
548   /* Find if the program is already active */
549   parseprogram =
550       (MpegTSParseProgram *) mpegts_base_get_program (GST_MPEGTS_BASE (parse),
551       program_num);
552   if (parseprogram) {
553     tspad->program = parseprogram;
554     parseprogram->tspad = tspad;
555   }
556
557   pad = tspad->pad;
558   parse->srcpads = g_list_append (parse->srcpads, pad);
559
560   gst_pad_set_active (pad, TRUE);
561
562   stream_id = gst_pad_create_stream_id (pad, element, padname + 8);
563
564   event =
565       gst_pad_get_sticky_event (parse->parent.sinkpad, GST_EVENT_STREAM_START,
566       0);
567   if (event) {
568     if (gst_event_parse_group_id (event, &parse->group_id))
569       parse->have_group_id = TRUE;
570     else
571       parse->have_group_id = FALSE;
572     gst_event_unref (event);
573   } else if (!parse->have_group_id) {
574     parse->have_group_id = TRUE;
575     parse->group_id = gst_util_group_id_next ();
576   }
577   event = gst_event_new_stream_start (stream_id);
578   if (parse->have_group_id)
579     gst_event_set_group_id (event, parse->group_id);
580
581   gst_pad_push_event (pad, event);
582   g_free (stream_id);
583
584   gst_element_add_pad (element, pad);
585
586   return pad;
587 }
588
589 static GstBuffer *
590 mpegts_packet_to_buffer (MpegTSPacketizerPacket * packet)
591 {
592   GstBuffer *buf =
593       gst_buffer_new_and_alloc (packet->data_end - packet->data_start);
594   gst_buffer_fill (buf, 0, packet->data_start,
595       packet->data_end - packet->data_start);
596   return buf;
597 }
598
599 static void
600 mpegts_parse_release_pad (GstElement * element, GstPad * pad)
601 {
602   MpegTSParse2 *parse = (MpegTSParse2 *) element;
603
604   gst_pad_set_active (pad, FALSE);
605   /* we do the cleanup in GstElement::pad-removed */
606   gst_flow_combiner_remove_pad (parse->flowcombiner, pad);
607   gst_element_remove_pad (element, pad);
608 }
609
610 static GstFlowReturn
611 empty_adapter_into_pad (MpegTSParse2Adapter * ts_adapter, GstPad * pad)
612 {
613   GstAdapter *adapter = ts_adapter->adapter;
614   GstBuffer *buf = NULL;
615   GstClockTime ts = gst_adapter_prev_pts (adapter, NULL);
616   gsize avail = gst_adapter_available (adapter);
617   GstFlowReturn ret = GST_FLOW_OK;
618
619   if (avail > 0)
620     buf = gst_adapter_take_buffer (adapter, avail);
621
622   ts_adapter->packets_in_adapter = 0;
623
624   if (buf) {
625     GST_BUFFER_PTS (buf) = ts;
626     ret = gst_pad_push (pad, buf);
627   }
628
629   return ret;
630 }
631
632 static GstFlowReturn
633 enqueue_and_maybe_push_buffer (MpegTSParse2 * parse, GstPad * pad,
634     MpegTSParse2Adapter * ts_adapter, GstBuffer * buffer)
635 {
636   GstFlowReturn ret = GST_FLOW_OK;
637
638   if (buffer != NULL) {
639     if (parse->alignment == 1) {
640       ret = gst_pad_push (pad, buffer);
641       ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
642     } else {
643       gst_adapter_push (ts_adapter->adapter, buffer);
644       ts_adapter->packets_in_adapter++;
645
646       if (ts_adapter->packets_in_adapter == parse->alignment
647           && ts_adapter->packets_in_adapter > 0) {
648         ret = empty_adapter_into_pad (ts_adapter, pad);
649         ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
650       }
651     }
652   }
653
654   return ret;
655 }
656
657 static GstFlowReturn
658 mpegts_parse_tspad_push_section (MpegTSParse2 * parse, MpegTSParsePad * tspad,
659     GstMpegtsSection * section, MpegTSPacketizerPacket * packet)
660 {
661   GstFlowReturn ret = GST_FLOW_OK;
662   gboolean to_push = TRUE;
663
664   if (tspad->program_number != -1) {
665     if (tspad->program) {
666       /* we push all sections to all pads except PMTs which we
667        * only push to pads meant to receive that program number */
668       if (section->table_id == 0x02) {
669         /* PMT */
670         if (section->subtable_extension != tspad->program_number)
671           to_push = FALSE;
672       }
673     } else if (section->table_id != 0x00) {
674       /* there's a program filter on the pad but the PMT for the program has not
675        * been parsed yet, ignore the pad until we get a PMT.
676        * But we always allow PAT to go through */
677       to_push = FALSE;
678     }
679   }
680
681   GST_DEBUG_OBJECT (parse,
682       "pushing section: %d program number: %d table_id: %d", to_push,
683       tspad->program_number, section->table_id);
684
685   if (to_push) {
686     GstBuffer *buf = mpegts_packet_to_buffer (packet);
687     ret =
688         enqueue_and_maybe_push_buffer (parse, tspad->pad, &tspad->ts_adapter,
689         buf);
690   }
691
692   GST_LOG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
693   return ret;
694 }
695
696 static GstFlowReturn
697 mpegts_parse_tspad_push (MpegTSParse2 * parse, MpegTSParsePad * tspad,
698     MpegTSPacketizerPacket * packet)
699 {
700   GstFlowReturn ret = GST_FLOW_OK;
701   MpegTSBaseProgram *bp = NULL;
702
703   if (tspad->program_number != -1) {
704     if (tspad->program)
705       bp = (MpegTSBaseProgram *) tspad->program;
706     else
707       bp = mpegts_base_get_program ((MpegTSBase *) parse,
708           tspad->program_number);
709   }
710
711   if (bp) {
712     if (packet->pid == bp->pmt_pid || bp->streams == NULL
713         || bp->streams[packet->pid]) {
714       GstBuffer *buf = mpegts_packet_to_buffer (packet);
715       /* push if there's no filter or if the pid is in the filter */
716       ret = gst_pad_push (tspad->pad, buf);
717       ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
718     }
719   }
720   GST_DEBUG_OBJECT (parse, "Returning %s", gst_flow_get_name (ret));
721
722   return ret;
723 }
724
725 static void
726 pad_clear_for_push (GstPad * pad, MpegTSParse2 * parse)
727 {
728   MpegTSParsePad *tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
729
730   tspad->flow_return = GST_FLOW_NOT_LINKED;
731   tspad->pushed = FALSE;
732 }
733
734 static GstFlowReturn
735 mpegts_parse_push (MpegTSBase * base, MpegTSPacketizerPacket * packet,
736     GstMpegtsSection * section)
737 {
738   MpegTSParse2 *parse = (MpegTSParse2 *) base;
739   guint32 pads_cookie;
740   gboolean done = FALSE;
741   GstPad *pad = NULL;
742   MpegTSParsePad *tspad;
743   GstFlowReturn ret;
744   GList *srcpads;
745   GstBuffer *buf;
746
747   GST_OBJECT_LOCK (parse);
748   srcpads = parse->srcpads;
749
750   /* clear tspad->pushed on pads */
751   g_list_foreach (srcpads, (GFunc) pad_clear_for_push, parse);
752   if (srcpads)
753     ret = GST_FLOW_NOT_LINKED;
754   else
755     ret = GST_FLOW_OK;
756
757   /* Get cookie and source pads list */
758   pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
759   if (G_LIKELY (srcpads)) {
760     pad = GST_PAD_CAST (srcpads->data);
761     g_object_ref (pad);
762   }
763   GST_OBJECT_UNLOCK (parse);
764
765   buf = mpegts_packet_to_buffer (packet);
766   ret = mpegts_parse_have_buffer (base, buf);
767
768   while (pad && !done) {
769     tspad = gst_pad_get_element_private (pad);
770
771     if (G_LIKELY (!tspad->pushed)) {
772       if (section) {
773         tspad->flow_return =
774             mpegts_parse_tspad_push_section (parse, tspad, section, packet);
775       } else {
776         tspad->flow_return = mpegts_parse_tspad_push (parse, tspad, packet);
777       }
778       tspad->pushed = TRUE;
779
780       if (G_UNLIKELY (tspad->flow_return != GST_FLOW_OK
781               && tspad->flow_return != GST_FLOW_NOT_LINKED)) {
782         /* return the error upstream */
783         ret = tspad->flow_return;
784         done = TRUE;
785       }
786
787     }
788
789     if (ret == GST_FLOW_NOT_LINKED)
790       ret = tspad->flow_return;
791
792     g_object_unref (pad);
793
794     if (G_UNLIKELY (!done)) {
795       GST_OBJECT_LOCK (parse);
796       if (G_UNLIKELY (pads_cookie != GST_ELEMENT_CAST (parse)->pads_cookie)) {
797         /* resync */
798         GST_DEBUG ("resync");
799         pads_cookie = GST_ELEMENT_CAST (parse)->pads_cookie;
800         srcpads = parse->srcpads;
801       } else {
802         GST_DEBUG ("getting next pad");
803         /* Get next pad */
804         srcpads = g_list_next (srcpads);
805       }
806
807       if (srcpads) {
808         pad = GST_PAD_CAST (srcpads->data);
809         g_object_ref (pad);
810       } else
811         done = TRUE;
812       GST_OBJECT_UNLOCK (parse);
813     }
814   }
815
816   return ret;
817 }
818
819 static void
820 mpegts_parse_inspect_packet (MpegTSBase * base, MpegTSPacketizerPacket * packet)
821 {
822   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
823   GST_LOG ("pid 0x%04x pusi:%d, afc:%d, cont:%d, payload:%p PCR %"
824       G_GUINT64_FORMAT, packet->pid, packet->payload_unit_start_indicator,
825       packet->scram_afc_cc & 0x30,
826       FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc), packet->payload,
827       packet->pcr);
828
829   /* Store the PCR if desired */
830   if (parse->current_pcr == GST_CLOCK_TIME_NONE &&
831       packet->afc_flags & MPEGTS_AFC_PCR_FLAG) {
832     /* Take this as the pcr_pid if set to auto-select */
833     if (parse->pcr_pid == -1)
834       parse->pcr_pid = packet->pid;
835     /* Check the PCR-PID matches the program we want for multiple programs */
836     if (parse->pcr_pid == packet->pid) {
837       parse->current_pcr = mpegts_packetizer_pts_to_ts (base->packetizer,
838           PCRTIME_TO_GSTTIME (packet->pcr), parse->pcr_pid);
839       GST_DEBUG ("Got new PCR %" GST_TIME_FORMAT " raw %" G_GUINT64_FORMAT,
840           GST_TIME_ARGS (parse->current_pcr), packet->pcr);
841       if (parse->base_pcr == GST_CLOCK_TIME_NONE) {
842         parse->base_pcr = parse->current_pcr;
843       }
844     }
845   }
846 }
847
848 static GstClockTime
849 get_pending_timestamp_diff (MpegTSParse2 * parse)
850 {
851   GList *l;
852   GstClockTime first_ts, last_ts;
853
854   if (parse->pending_buffers == NULL)
855     return GST_CLOCK_TIME_NONE;
856
857   l = g_list_last (parse->pending_buffers);
858   first_ts = GST_BUFFER_PTS (l->data);
859   if (first_ts == GST_CLOCK_TIME_NONE)
860     return GST_CLOCK_TIME_NONE;
861
862   l = g_list_first (parse->pending_buffers);
863   last_ts = GST_BUFFER_PTS (l->data);
864   if (last_ts == GST_CLOCK_TIME_NONE)
865     return GST_CLOCK_TIME_NONE;
866
867   return last_ts - first_ts;
868 }
869
870 static GstFlowReturn
871 drain_pending_buffers (MpegTSParse2 * parse, gboolean drain_all)
872 {
873   GstFlowReturn ret = GST_FLOW_OK;
874   GstClockTime start_ts;
875   GstClockTime pcr = GST_CLOCK_TIME_NONE;
876   GstClockTime pcr_diff = 0;
877   gsize pcr_bytes, bytes_since_pcr, pos;
878   GstBuffer *buffer;
879   GList *l, *end = NULL;
880
881   if (parse->pending_buffers == NULL)
882     return GST_FLOW_OK;         /* Nothing to push */
883
884   /*
885    * There are 4 cases:
886    *  1 We get a buffer with no PCR -> it's the head of the list
887    *      -> Do nothing, unless it's EOS
888    *  2 We get a buffer with a PCR, it's the first PCR we've seen, and belongs
889    *    to the buffer at the head of the list
890    *    -> Push any buffers in the list except the head,
891    *       using a smoothing of their timestamps to land at the PCR
892    *    -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
893    *  3 It's EOS (drain_all == TRUE, current_pcr == NONE)
894    *    -> Push any buffers in the list using a smoothing of their timestamps
895    *       starting at the previous PCR or first TS
896    *  4 We get a buffer with a PCR, and have a previous PCR
897    *    -> If distance > smoothing_latency,
898    *       output buffers except the last in the pending queue using
899    *       piecewise-linear timestamps
900    *    -> store new PCR as the previous PCR, bytes_since_pcr = sizeof (buffer);
901    */
902
903   /* Case 1 */
904   if (!GST_CLOCK_TIME_IS_VALID (parse->current_pcr) && !drain_all)
905     return GST_FLOW_OK;
906
907   if (GST_CLOCK_TIME_IS_VALID (parse->current_pcr)) {
908     pcr = parse->current_pcr;
909     parse->current_pcr = GST_CLOCK_TIME_NONE;
910   }
911
912   /* The bytes of the last buffer are after the PCR */
913   buffer = GST_BUFFER (g_list_nth_data (parse->pending_buffers, 0));
914   bytes_since_pcr = gst_buffer_get_size (buffer);
915
916   pcr_bytes = parse->bytes_since_pcr - bytes_since_pcr;
917
918   if (!drain_all)
919     end = g_list_first (parse->pending_buffers);
920
921   /* Case 2 */
922   if (!GST_CLOCK_TIME_IS_VALID (parse->previous_pcr)) {
923     pcr_diff = get_pending_timestamp_diff (parse);
924
925     /* Calculate the start_ts that ends at the end timestamp */
926     start_ts = GST_CLOCK_TIME_NONE;
927     if (end) {
928       start_ts = GST_BUFFER_PTS (GST_BUFFER (end->data));
929       if (start_ts > pcr_diff)
930         start_ts -= pcr_diff;
931     }
932   } else if (drain_all) {       /* Case 3 */
933     start_ts = parse->previous_pcr;
934     pcr_diff = get_pending_timestamp_diff (parse);
935   } else {                      /* Case 4 */
936     start_ts = parse->previous_pcr;
937     if (GST_CLOCK_TIME_IS_VALID (pcr) && pcr > start_ts)
938       pcr_diff = GST_CLOCK_DIFF (start_ts, pcr);
939
940     /* Make sure PCR observations are sufficiently far apart */
941     if (drain_all == FALSE && pcr_diff < parse->smoothing_latency)
942       return GST_FLOW_OK;
943   }
944
945   GST_INFO_OBJECT (parse, "Pushing buffers - startTS %" GST_TIME_FORMAT
946       " duration %" GST_TIME_FORMAT " %" G_GSIZE_FORMAT " bytes",
947       GST_TIME_ARGS (start_ts), GST_TIME_ARGS (pcr_diff), pcr_bytes);
948
949   /* Now, push buffers out pacing timestamps over pcr_diff time and pcr_bytes */
950   pos = 0;
951   l = g_list_last (parse->pending_buffers);
952   while (l != end) {
953     GList *p;
954     GstClockTime out_ts = start_ts;
955
956     buffer = gst_buffer_make_writable (GST_BUFFER (l->data));
957
958     if (out_ts != GST_CLOCK_TIME_NONE && pcr_diff != GST_CLOCK_TIME_NONE &&
959         pcr_bytes && pos)
960       out_ts += gst_util_uint64_scale (pcr_diff, pos, pcr_bytes);
961
962     pos += gst_buffer_get_size (buffer);
963
964     GST_DEBUG_OBJECT (parse,
965         "InputTS %" GST_TIME_FORMAT " out %" GST_TIME_FORMAT,
966         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (out_ts));
967
968     GST_BUFFER_PTS (buffer) = out_ts + parse->ts_offset;
969     GST_BUFFER_DTS (buffer) = out_ts + parse->ts_offset;
970     if (ret == GST_FLOW_OK) {
971       ret =
972           enqueue_and_maybe_push_buffer (parse, parse->srcpad,
973           &parse->ts_adapter, buffer);
974     } else {
975       gst_buffer_unref (buffer);
976     }
977
978     /* Free this list node and move to the next */
979     p = g_list_previous (l);
980     parse->pending_buffers = g_list_delete_link (parse->pending_buffers, l);
981     l = p;
982   }
983
984   if (parse->is_eos) {
985     empty_adapter_into_pad (&parse->ts_adapter, parse->srcpad);
986   }
987
988   parse->pending_buffers = end;
989   parse->bytes_since_pcr = bytes_since_pcr;
990   parse->previous_pcr = pcr;
991   return ret;
992 }
993
994 static GstFlowReturn
995 mpegts_parse_have_buffer (MpegTSBase * base, GstBuffer * buffer)
996 {
997   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
998   GstFlowReturn ret = GST_FLOW_OK;
999
1000   GST_LOG_OBJECT (parse, "Received buffer %" GST_PTR_FORMAT, buffer);
1001
1002   /* Assume all packets have equal size */
1003   if (parse->alignment > 0 &&
1004       base->packetizer->packet_size != MPEGTS_NORMAL_PACKETSIZE) {
1005     GstMapInfo map;
1006     guint8 *data;
1007
1008     gst_buffer_map (buffer, &map, GST_MAP_READ);
1009     data = map.data;
1010
1011     parse->header = GST_READ_UINT32_BE (data);
1012     gst_buffer_unmap (buffer, &map);
1013   }
1014
1015   if (parse->current_pcr != GST_CLOCK_TIME_NONE) {
1016     GST_DEBUG_OBJECT (parse,
1017         "InputTS %" GST_TIME_FORMAT " PCR %" GST_TIME_FORMAT,
1018         GST_TIME_ARGS (GST_BUFFER_PTS (buffer)),
1019         GST_TIME_ARGS (parse->current_pcr));
1020   }
1021
1022   if (parse->set_timestamps || parse->first) {
1023     parse->pending_buffers = g_list_prepend (parse->pending_buffers, buffer);
1024     parse->bytes_since_pcr += gst_buffer_get_size (buffer);
1025     buffer = NULL;
1026   }
1027
1028   if (!prepare_src_pad (base, parse))
1029     return GST_FLOW_OK;
1030
1031   if (parse->pending_buffers != NULL) {
1032     /* Don't keep pending_buffers if not setting output timestamps */
1033     gboolean drain_all = (parse->set_timestamps == FALSE);
1034     ret = drain_pending_buffers (parse, drain_all);
1035     if (ret != GST_FLOW_OK) {
1036       if (buffer)
1037         gst_buffer_unref (buffer);
1038       return ret;
1039     }
1040   }
1041
1042   ret =
1043       enqueue_and_maybe_push_buffer (parse, parse->srcpad, &parse->ts_adapter,
1044       buffer);
1045
1046   return ret;
1047 }
1048
1049 static void
1050 empty_pad (GstPad * pad, MpegTSParse2 * parse)
1051 {
1052   MpegTSParsePad *tspad = (MpegTSParsePad *) gst_pad_get_element_private (pad);
1053   GstFlowReturn ret;
1054
1055   ret = empty_adapter_into_pad (&tspad->ts_adapter, tspad->pad);
1056   ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
1057 }
1058
1059 static GstFlowReturn
1060 mpegts_parse_input_done (MpegTSBase * base, GstBuffer * buffer)
1061 {
1062   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
1063   GstFlowReturn ret = GST_FLOW_OK;
1064
1065   if (!prepare_src_pad (base, parse))
1066     return GST_FLOW_OK;
1067
1068   if (parse->alignment == 0) {
1069     ret = empty_adapter_into_pad (&parse->ts_adapter, parse->srcpad);
1070     ret = gst_flow_combiner_update_flow (parse->flowcombiner, ret);
1071     g_list_foreach (parse->srcpads, (GFunc) empty_pad, parse);
1072   }
1073   return ret;
1074 }
1075
1076 static MpegTSParsePad *
1077 find_pad_for_program (MpegTSParse2 * parse, guint program_number)
1078 {
1079   GList *tmp;
1080
1081   for (tmp = parse->srcpads; tmp; tmp = tmp->next) {
1082     MpegTSParsePad *tspad = gst_pad_get_element_private ((GstPad *) tmp->data);
1083
1084     if (tspad->program_number == program_number)
1085       return tspad;
1086   }
1087
1088   return NULL;
1089 }
1090
1091 static void
1092 mpegts_parse_program_started (MpegTSBase * base, MpegTSBaseProgram * program)
1093 {
1094   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
1095   MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
1096   MpegTSParsePad *tspad;
1097
1098   /* If we have a request pad for that program, activate it */
1099   tspad = find_pad_for_program (parse, program->program_number);
1100
1101   if (tspad) {
1102     tspad->program = parseprogram;
1103     parseprogram->tspad = tspad;
1104   }
1105 }
1106
1107 static void
1108 mpegts_parse_program_stopped (MpegTSBase * base, MpegTSBaseProgram * program)
1109 {
1110   MpegTSParse2 *parse = GST_MPEGTS_PARSE (base);
1111   MpegTSParseProgram *parseprogram = (MpegTSParseProgram *) program;
1112   MpegTSParsePad *tspad;
1113
1114   /* If we have a request pad for that program, activate it */
1115   tspad = find_pad_for_program (parse, program->program_number);
1116
1117   if (tspad) {
1118     tspad->program = NULL;
1119     parseprogram->tspad = NULL;
1120   }
1121
1122   parse->pcr_pid = -1;
1123   parse->ts_offset += parse->current_pcr - parse->base_pcr;
1124   parse->base_pcr = GST_CLOCK_TIME_NONE;
1125 }
1126
1127 static gboolean
1128 mpegts_parse_src_pad_query (GstPad * pad, GstObject * parent, GstQuery * query)
1129 {
1130   MpegTSParse2 *parse = GST_MPEGTS_PARSE (parent);
1131   gboolean res;
1132
1133   switch (GST_QUERY_TYPE (query)) {
1134     case GST_QUERY_LATENCY:
1135     {
1136       if ((res = gst_pad_peer_query (((MpegTSBase *) parse)->sinkpad, query))) {
1137         gboolean is_live;
1138         GstClockTime min_latency, max_latency;
1139
1140         gst_query_parse_latency (query, &is_live, &min_latency, &max_latency);
1141         if (is_live) {
1142           GstClockTime extra_latency = TS_LATENCY * GST_MSECOND;
1143           if (parse->set_timestamps) {
1144             extra_latency = MAX (extra_latency, parse->smoothing_latency);
1145           }
1146           min_latency += extra_latency;
1147           if (max_latency != GST_CLOCK_TIME_NONE)
1148             max_latency += extra_latency;
1149         }
1150
1151         gst_query_set_latency (query, is_live, min_latency, max_latency);
1152       }
1153       break;
1154     }
1155     default:
1156       res = gst_pad_query_default (pad, parent, query);
1157   }
1158   return res;
1159 }
1160
1161 gboolean
1162 gst_mpegtsparse_plugin_init (GstPlugin * plugin)
1163 {
1164   GST_DEBUG_CATEGORY_INIT (mpegts_parse_debug, "tsparse", 0,
1165       "MPEG transport stream parser");
1166
1167   return gst_element_register (plugin, "tsparse",
1168       GST_RANK_NONE, GST_TYPE_MPEGTS_PARSE);
1169 }