Merging gst-libav
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-bad / gst / mpegtsdemux / mpegtspacketizer.c
1 /*
2  * mpegtspacketizer.c -
3  * Copyright (C) 2007, 2008 Alessandro Decina, Zaheer Merali
4  *
5  * Authors:
6  *   Zaheer Merali <zaheerabbas at merali dot org>
7  *   Alessandro Decina <alessandro@nnva.org>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 #ifdef HAVE_CONFIG_H
25 #include "config.h"
26 #endif
27
28 #include <string.h>
29 #include <stdlib.h>
30
31 /* Skew calculation pameters */
32 #define MAX_TIME        (2 * GST_SECOND)
33
34 /* maximal PCR time */
35 #define PCR_MAX_VALUE (((((guint64)1)<<33) * 300) + 298)
36 #define PCR_GST_MAX_VALUE (PCR_MAX_VALUE * GST_MSECOND / (PCR_MSECOND))
37 #define PTS_DTS_MAX_VALUE (((guint64)1) << 33)
38
39 #include "mpegtspacketizer.h"
40 #include "gstmpegdesc.h"
41
42 GST_DEBUG_CATEGORY_STATIC (mpegts_packetizer_debug);
43 #define GST_CAT_DEFAULT mpegts_packetizer_debug
44
45 static void _init_local (void);
46 G_DEFINE_TYPE_EXTENDED (MpegTSPacketizer2, mpegts_packetizer, G_TYPE_OBJECT, 0,
47     _init_local ());
48
49 #define ABSDIFF(a,b) ((a) < (b) ? (b) - (a) : (a) - (b))
50
51 #define PACKETIZER_GROUP_LOCK(p) g_mutex_lock(&((p)->group_lock))
52 #define PACKETIZER_GROUP_UNLOCK(p) g_mutex_unlock(&((p)->group_lock))
53
54 static void mpegts_packetizer_dispose (GObject * object);
55 static void mpegts_packetizer_finalize (GObject * object);
56 static GstClockTime calculate_skew (MpegTSPacketizer2 * packetizer,
57     MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time);
58 static void _close_current_group (MpegTSPCR * pcrtable);
59 static void record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable,
60     guint64 pcr, guint64 offset);
61
62 #define CONTINUITY_UNSET 255
63 #define VERSION_NUMBER_UNSET 255
64 #define TABLE_ID_UNSET 0xFF
65 #define PACKET_SYNC_BYTE 0x47
66
67 static inline MpegTSPCR *
68 get_pcr_table (MpegTSPacketizer2 * packetizer, guint16 pid)
69 {
70   MpegTSPCR *res;
71
72   res = packetizer->observations[packetizer->pcrtablelut[pid]];
73
74   if (G_UNLIKELY (res == NULL)) {
75     /* If we don't have a PCR table for the requested PID, create one .. */
76     res = g_new0 (MpegTSPCR, 1);
77     /* Add it to the last table position */
78     packetizer->observations[packetizer->lastobsid] = res;
79     /* Update the pcrtablelut */
80     packetizer->pcrtablelut[pid] = packetizer->lastobsid;
81     /* And increment the last know slot */
82     packetizer->lastobsid++;
83
84     /* Finally set the default values */
85     res->pid = pid;
86     res->base_time = GST_CLOCK_TIME_NONE;
87     res->base_pcrtime = GST_CLOCK_TIME_NONE;
88     res->last_pcrtime = GST_CLOCK_TIME_NONE;
89     res->window_pos = 0;
90     res->window_filling = TRUE;
91     res->window_min = 0;
92     res->skew = 0;
93     res->prev_send_diff = GST_CLOCK_TIME_NONE;
94     res->prev_out_time = GST_CLOCK_TIME_NONE;
95     res->pcroffset = 0;
96
97     res->current = g_slice_new0 (PCROffsetCurrent);
98   }
99
100   return res;
101 }
102
103 static void
104 pcr_offset_group_free (PCROffsetGroup * group)
105 {
106   g_free (group->values);
107   g_slice_free (PCROffsetGroup, group);
108 }
109
110 static void
111 flush_observations (MpegTSPacketizer2 * packetizer)
112 {
113   gint i;
114
115   for (i = 0; i < packetizer->lastobsid; i++) {
116     g_list_free_full (packetizer->observations[i]->groups,
117         (GDestroyNotify) pcr_offset_group_free);
118     if (packetizer->observations[i]->current)
119       g_slice_free (PCROffsetCurrent, packetizer->observations[i]->current);
120     g_free (packetizer->observations[i]);
121     packetizer->observations[i] = NULL;
122   }
123   memset (packetizer->pcrtablelut, 0xff, 0x2000);
124   packetizer->lastobsid = 0;
125 }
126
127 GstClockTime
128 mpegts_packetizer_get_current_time (MpegTSPacketizer2 * packetizer,
129     guint16 pcr_pid)
130 {
131   MpegTSPCR *pcrtable = get_pcr_table (packetizer, pcr_pid);
132
133   if (pcrtable == NULL)
134     return GST_CLOCK_TIME_NONE;
135
136   return mpegts_packetizer_pts_to_ts (packetizer, pcrtable->last_pcrtime,
137       pcr_pid);
138 }
139
140 static inline MpegTSPacketizerStreamSubtable *
141 find_subtable (GSList * subtables, guint8 table_id, guint16 subtable_extension)
142 {
143   GSList *tmp;
144
145   /* FIXME: Make this an array ! */
146   for (tmp = subtables; tmp; tmp = tmp->next) {
147     MpegTSPacketizerStreamSubtable *sub =
148         (MpegTSPacketizerStreamSubtable *) tmp->data;
149     if (sub->table_id == table_id
150         && sub->subtable_extension == subtable_extension)
151       return sub;
152   }
153
154   return NULL;
155 }
156
157 static gboolean
158 seen_section_before (MpegTSPacketizerStream * stream, guint8 table_id,
159     guint16 subtable_extension, guint8 version_number, guint8 section_number,
160     guint8 last_section_number, guint8 * data_start, gsize to_read)
161 {
162   MpegTSPacketizerStreamSubtable *subtable;
163
164   /* Check if we've seen this table_id/subtable_extension first */
165   subtable = find_subtable (stream->subtables, table_id, subtable_extension);
166   if (!subtable) {
167     GST_DEBUG ("Haven't seen subtable");
168     return FALSE;
169   }
170   /* If we have, check it has the same version_number */
171   if (subtable->version_number != version_number) {
172     GST_DEBUG ("Different version number");
173     return FALSE;
174   }
175   /* Did the number of sections change ? */
176   if (subtable->last_section_number != last_section_number) {
177     GST_DEBUG ("Different last_section_number");
178     return FALSE;
179   }
180   /* Finally return whether we saw that section or not */
181   if (!MPEGTS_BIT_IS_SET (subtable->seen_section, section_number)) {
182     GST_DEBUG ("Different section_number");
183     return FALSE;
184   }
185
186   if (stream->section_data) {
187     /* Everything else is the same, fall back to memcmp */
188     return (memcmp (stream->section_data, data_start, to_read) != 0);
189   }
190
191   return FALSE;
192 }
193
194 static MpegTSPacketizerStreamSubtable *
195 mpegts_packetizer_stream_subtable_new (guint8 table_id,
196     guint16 subtable_extension, guint8 last_section_number)
197 {
198   MpegTSPacketizerStreamSubtable *subtable;
199
200   subtable = g_new0 (MpegTSPacketizerStreamSubtable, 1);
201   subtable->version_number = VERSION_NUMBER_UNSET;
202   subtable->table_id = table_id;
203   subtable->subtable_extension = subtable_extension;
204   subtable->last_section_number = last_section_number;
205   return subtable;
206 }
207
208 static MpegTSPacketizerStream *
209 mpegts_packetizer_stream_new (guint16 pid)
210 {
211   MpegTSPacketizerStream *stream;
212
213   stream = (MpegTSPacketizerStream *) g_new0 (MpegTSPacketizerStream, 1);
214   stream->continuity_counter = CONTINUITY_UNSET;
215   stream->subtables = NULL;
216   stream->table_id = TABLE_ID_UNSET;
217   stream->pid = pid;
218   return stream;
219 }
220
221 static void
222 mpegts_packetizer_clear_section (MpegTSPacketizerStream * stream)
223 {
224   stream->continuity_counter = CONTINUITY_UNSET;
225   stream->section_length = 0;
226   stream->section_offset = 0;
227   stream->table_id = TABLE_ID_UNSET;
228   g_free (stream->section_data);
229   stream->section_data = NULL;
230 }
231
232 static void
233 mpegts_packetizer_stream_subtable_free (MpegTSPacketizerStreamSubtable *
234     subtable)
235 {
236   g_free (subtable);
237 }
238
239 static void
240 mpegts_packetizer_stream_free (MpegTSPacketizerStream * stream)
241 {
242   mpegts_packetizer_clear_section (stream);
243   g_slist_foreach (stream->subtables,
244       (GFunc) mpegts_packetizer_stream_subtable_free, NULL);
245   g_slist_free (stream->subtables);
246   g_free (stream);
247 }
248
249 static void
250 mpegts_packetizer_class_init (MpegTSPacketizer2Class * klass)
251 {
252   GObjectClass *gobject_class;
253
254   gobject_class = G_OBJECT_CLASS (klass);
255
256   gobject_class->dispose = mpegts_packetizer_dispose;
257   gobject_class->finalize = mpegts_packetizer_finalize;
258 }
259
260 static void
261 mpegts_packetizer_init (MpegTSPacketizer2 * packetizer)
262 {
263   g_mutex_init (&packetizer->group_lock);
264
265   packetizer->adapter = gst_adapter_new ();
266   packetizer->offset = 0;
267   packetizer->empty = TRUE;
268   packetizer->streams = g_new0 (MpegTSPacketizerStream *, 8192);
269   packetizer->packet_size = 0;
270   packetizer->calculate_skew = FALSE;
271   packetizer->calculate_offset = FALSE;
272
273   packetizer->map_data = NULL;
274   packetizer->map_size = 0;
275   packetizer->map_offset = 0;
276   packetizer->need_sync = FALSE;
277
278   memset (packetizer->pcrtablelut, 0xff, 0x2000);
279   memset (packetizer->observations, 0x0, sizeof (packetizer->observations));
280   packetizer->lastobsid = 0;
281
282   packetizer->nb_seen_offsets = 0;
283   packetizer->refoffset = -1;
284   packetizer->last_in_time = GST_CLOCK_TIME_NONE;
285   packetizer->pcr_discont_threshold = GST_SECOND;
286   packetizer->last_pts = GST_CLOCK_TIME_NONE;
287   packetizer->last_dts = GST_CLOCK_TIME_NONE;
288 }
289
290 static void
291 mpegts_packetizer_dispose (GObject * object)
292 {
293   MpegTSPacketizer2 *packetizer = GST_MPEGTS_PACKETIZER (object);
294
295   if (!packetizer->disposed) {
296     if (packetizer->packet_size)
297       packetizer->packet_size = 0;
298     if (packetizer->streams) {
299       int i;
300       for (i = 0; i < 8192; i++) {
301         if (packetizer->streams[i])
302           mpegts_packetizer_stream_free (packetizer->streams[i]);
303       }
304       g_free (packetizer->streams);
305     }
306
307     gst_adapter_clear (packetizer->adapter);
308     g_object_unref (packetizer->adapter);
309     g_mutex_clear (&packetizer->group_lock);
310     packetizer->disposed = TRUE;
311     packetizer->offset = 0;
312     packetizer->empty = TRUE;
313
314     flush_observations (packetizer);
315   }
316
317   if (G_OBJECT_CLASS (mpegts_packetizer_parent_class)->dispose)
318     G_OBJECT_CLASS (mpegts_packetizer_parent_class)->dispose (object);
319 }
320
321 static void
322 mpegts_packetizer_finalize (GObject * object)
323 {
324   if (G_OBJECT_CLASS (mpegts_packetizer_parent_class)->finalize)
325     G_OBJECT_CLASS (mpegts_packetizer_parent_class)->finalize (object);
326 }
327
328 static inline guint64
329 mpegts_packetizer_compute_pcr (const guint8 * data)
330 {
331   guint32 pcr1;
332   guint16 pcr2;
333   guint64 pcr, pcr_ext;
334
335   pcr1 = GST_READ_UINT32_BE (data);
336   pcr2 = GST_READ_UINT16_BE (data + 4);
337   pcr = ((guint64) pcr1) << 1;
338   pcr |= (pcr2 & 0x8000) >> 15;
339   pcr_ext = (pcr2 & 0x01ff);
340   return pcr * 300 + pcr_ext % 300;
341 }
342
343 static gboolean
344 mpegts_packetizer_parse_adaptation_field_control (MpegTSPacketizer2 *
345     packetizer, MpegTSPacketizerPacket * packet)
346 {
347   guint8 length, afcflags;
348   guint8 *data;
349
350   length = *packet->data++;
351
352   /* an adaptation field with length 0 is valid and
353    * can be used to insert a single stuffing byte */
354   if (!length) {
355     packet->afc_flags = 0;
356     return TRUE;
357   }
358
359   if ((packet->scram_afc_cc & 0x30) == 0x20) {
360     /* no payload, adaptation field of 183 bytes */
361     if (length > 183) {
362       GST_WARNING ("PID 0x%04x afc == 0x%02x and length %d > 183",
363           packet->pid, packet->scram_afc_cc & 0x30, length);
364       return FALSE;
365     }
366     if (length != 183) {
367       GST_WARNING ("PID 0x%04x afc == 0x%02x and length %d != 183",
368           packet->pid, packet->scram_afc_cc & 0x30, length);
369       GST_MEMDUMP ("Unknown payload", packet->data + length,
370           packet->data_end - packet->data - length);
371     }
372   } else if (length == 183) {
373     /* Note: According to the specification, the adaptation field length
374      * must be 183 if there is no payload data and < 183 if the packet
375      * contains an adaptation field and payload data.
376      * Some payloaders always set the flag for payload data, even if the
377      * adaptation field length is 183. This just means a zero length
378      * payload so we clear the payload flag here and continue.
379      */
380     GST_WARNING ("PID 0x%04x afc == 0x%02x and length %d == 183 (ignored)",
381         packet->pid, packet->scram_afc_cc & 0x30, length);
382     packet->scram_afc_cc &= ~0x10;
383   } else if (length > 182) {
384     GST_WARNING ("PID 0x%04x afc == 0x%02x and length %d > 182",
385         packet->pid, packet->scram_afc_cc & 0x30, length);
386     return FALSE;
387   }
388
389   if (packet->data + length > packet->data_end) {
390     GST_DEBUG
391         ("PID 0x%04x afc length %d overflows the buffer current %d max %d",
392         packet->pid, length, (gint) (packet->data - packet->data_start),
393         (gint) (packet->data_end - packet->data_start));
394     return FALSE;
395   }
396
397   data = packet->data;
398   packet->data += length;
399
400   afcflags = packet->afc_flags = *data++;
401
402   GST_DEBUG ("flags: %s%s%s%s%s%s%s%s%s",
403       afcflags & 0x80 ? "discontinuity " : "",
404       afcflags & 0x40 ? "random_access " : "",
405       afcflags & 0x20 ? "elementary_stream_priority " : "",
406       afcflags & 0x10 ? "PCR " : "",
407       afcflags & 0x08 ? "OPCR " : "",
408       afcflags & 0x04 ? "splicing_point " : "",
409       afcflags & 0x02 ? "transport_private_data " : "",
410       afcflags & 0x01 ? "extension " : "", afcflags == 0x00 ? "<none>" : "");
411
412   /* PCR */
413   if (afcflags & MPEGTS_AFC_PCR_FLAG) {
414     MpegTSPCR *pcrtable = NULL;
415     packet->pcr = mpegts_packetizer_compute_pcr (data);
416     data += 6;
417     GST_DEBUG ("pcr 0x%04x %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT
418         ") offset:%" G_GUINT64_FORMAT, packet->pid, packet->pcr,
419         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (packet->pcr)), packet->offset);
420
421     PACKETIZER_GROUP_LOCK (packetizer);
422     if (packetizer->calculate_skew
423         && GST_CLOCK_TIME_IS_VALID (packetizer->last_in_time)) {
424       pcrtable = get_pcr_table (packetizer, packet->pid);
425       calculate_skew (packetizer, pcrtable, packet->pcr,
426           packetizer->last_in_time);
427     }
428     if (packetizer->calculate_offset) {
429       if (!pcrtable)
430         pcrtable = get_pcr_table (packetizer, packet->pid);
431       record_pcr (packetizer, pcrtable, packet->pcr, packet->offset);
432     }
433     PACKETIZER_GROUP_UNLOCK (packetizer);
434   }
435 #ifndef GST_DISABLE_GST_DEBUG
436   /* OPCR */
437   if (afcflags & MPEGTS_AFC_OPCR_FLAG) {
438     /* Note: We don't use/need opcr for the time being */
439     guint64 opcr = mpegts_packetizer_compute_pcr (data);
440     data += 6;
441     GST_DEBUG ("opcr %" G_GUINT64_FORMAT " (%" GST_TIME_FORMAT ")",
442         opcr, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (opcr)));
443   }
444
445   if (afcflags & MPEGTS_AFC_SPLICING_POINT_FLAG) {
446     GST_DEBUG ("splice_countdown: %u", *data++);
447   }
448
449   if (afcflags & MPEGTS_AFC_TRANSPORT_PRIVATE_DATA_FLAG) {
450     guint8 len = *data++;
451     GST_MEMDUMP ("private data", data, len);
452     data += len;
453   }
454
455   if (afcflags & MPEGTS_AFC_EXTENSION_FLAG) {
456     guint8 extlen = *data++;
457     guint8 flags = *data++;
458     GST_DEBUG ("extension size:%d flags : %s%s%s", extlen,
459         flags & 0x80 ? "ltw " : "",
460         flags & 0x40 ? "piecewise_rate " : "",
461         flags & 0x20 ? "seamless_splice " : "");
462     if (flags & 0x80) {
463       GST_DEBUG ("legal time window: valid_flag:%d offset:%d", *data >> 7,
464           GST_READ_UINT16_BE (data) & 0x7fff);
465       data += 2;
466     }
467   }
468 #endif
469
470   return TRUE;
471 }
472
473 static MpegTSPacketizerPacketReturn
474 mpegts_packetizer_parse_packet (MpegTSPacketizer2 * packetizer,
475     MpegTSPacketizerPacket * packet)
476 {
477   guint8 *data;
478   guint8 tmp;
479
480   data = packet->data_start;
481   data += 1;
482   tmp = *data;
483
484   /* transport_error_indicator 1 */
485   if (G_UNLIKELY (tmp & 0x80))
486     return PACKET_BAD;
487
488   /* payload_unit_start_indicator 1 */
489   packet->payload_unit_start_indicator = tmp & 0x40;
490
491   /* transport_priority 1 */
492   /* PID 13 */
493   packet->pid = GST_READ_UINT16_BE (data) & 0x1FFF;
494   data += 2;
495
496   packet->scram_afc_cc = tmp = *data++;
497   /* transport_scrambling_control 2 */
498   if (G_UNLIKELY (tmp & 0xc0))
499     return PACKET_BAD;
500
501   packet->data = data;
502
503   packet->afc_flags = 0;
504   packet->pcr = G_MAXUINT64;
505
506   if (FLAGS_HAS_AFC (tmp)) {
507     if (!mpegts_packetizer_parse_adaptation_field_control (packetizer, packet))
508       return FALSE;
509   }
510
511   if (FLAGS_HAS_PAYLOAD (packet->scram_afc_cc))
512     packet->payload = packet->data;
513   else
514     packet->payload = NULL;
515
516   return PACKET_OK;
517 }
518
519 static GstMpegtsSection *
520 mpegts_packetizer_parse_section_header (MpegTSPacketizer2 * packetizer,
521     MpegTSPacketizerStream * stream)
522 {
523   MpegTSPacketizerStreamSubtable *subtable;
524   GstMpegtsSection *res;
525
526   subtable =
527       find_subtable (stream->subtables, stream->table_id,
528       stream->subtable_extension);
529   if (subtable) {
530     GST_DEBUG ("Found previous subtable_extension:0x%04x",
531         stream->subtable_extension);
532     if (G_UNLIKELY (stream->version_number != subtable->version_number)) {
533       /* If the version number changed, reset the subtable */
534       subtable->version_number = stream->version_number;
535       subtable->last_section_number = stream->last_section_number;
536       memset (subtable->seen_section, 0, 32);
537     }
538   } else {
539     GST_DEBUG ("Appending new subtable_extension: 0x%04x",
540         stream->subtable_extension);
541     subtable = mpegts_packetizer_stream_subtable_new (stream->table_id,
542         stream->subtable_extension, stream->last_section_number);
543     subtable->version_number = stream->version_number;
544
545     stream->subtables = g_slist_prepend (stream->subtables, subtable);
546   }
547
548   GST_MEMDUMP ("Full section data", stream->section_data,
549       stream->section_length);
550   /* TODO ? : Replace this by an efficient version (where we provide all
551    * pre-parsed header data) */
552   res =
553       gst_mpegts_section_new (stream->pid, stream->section_data,
554       stream->section_length);
555   stream->section_data = NULL;
556   mpegts_packetizer_clear_section (stream);
557
558   if (res) {
559     /* NOTE : Due to the new mpegts-si system, There is a insanely low probability
560      * that we might have gotten a section that was corrupted (i.e. wrong crc)
561      * and that we consider it as seen.
562      *
563      * The reason why we consider this as acceptable is because all the previous
564      * checks were already done:
565      * * transport layer checks (DVB)
566      * * 0x47 validation
567      * * continuity counter validation
568      * * subtable validation
569      * * section_number validation
570      * * section_length validation
571      *
572      * The probability of this happening vs the overhead of doing CRC checks
573      * on all sections (including those we would not use) is just not worth it.
574      * */
575     MPEGTS_BIT_SET (subtable->seen_section, stream->section_number);
576     res->offset = stream->offset;
577   }
578
579   return res;
580 }
581
582 void
583 mpegts_packetizer_clear (MpegTSPacketizer2 * packetizer)
584 {
585   guint i;
586   MpegTSPCR *pcrtable;
587
588   packetizer->packet_size = 0;
589
590   if (packetizer->streams) {
591     int i;
592     for (i = 0; i < 8192; i++) {
593       if (packetizer->streams[i]) {
594         mpegts_packetizer_stream_free (packetizer->streams[i]);
595       }
596     }
597     memset (packetizer->streams, 0, 8192 * sizeof (MpegTSPacketizerStream *));
598   }
599
600   gst_adapter_clear (packetizer->adapter);
601   packetizer->offset = 0;
602   packetizer->empty = TRUE;
603   packetizer->need_sync = FALSE;
604   packetizer->map_data = NULL;
605   packetizer->map_size = 0;
606   packetizer->map_offset = 0;
607   packetizer->last_in_time = GST_CLOCK_TIME_NONE;
608   packetizer->last_pts = GST_CLOCK_TIME_NONE;
609   packetizer->last_dts = GST_CLOCK_TIME_NONE;
610
611   pcrtable = packetizer->observations[packetizer->pcrtablelut[0x1fff]];
612   if (pcrtable)
613     pcrtable->base_time = GST_CLOCK_TIME_NONE;
614
615   /* Close current PCR group */
616   PACKETIZER_GROUP_LOCK (packetizer);
617
618   for (i = 0; i < MAX_PCR_OBS_CHANNELS; i++) {
619     if (packetizer->observations[i])
620       _close_current_group (packetizer->observations[i]);
621     else
622       break;
623   }
624   PACKETIZER_GROUP_UNLOCK (packetizer);
625 }
626
627 void
628 mpegts_packetizer_flush (MpegTSPacketizer2 * packetizer, gboolean hard)
629 {
630   guint i;
631   MpegTSPCR *pcrtable;
632   GST_DEBUG ("Flushing");
633
634   if (packetizer->streams) {
635     for (i = 0; i < 8192; i++) {
636       if (packetizer->streams[i]) {
637         mpegts_packetizer_clear_section (packetizer->streams[i]);
638       }
639     }
640   }
641   gst_adapter_clear (packetizer->adapter);
642
643   packetizer->offset = 0;
644   packetizer->empty = TRUE;
645   packetizer->need_sync = FALSE;
646   packetizer->map_data = NULL;
647   packetizer->map_size = 0;
648   packetizer->map_offset = 0;
649   packetizer->last_in_time = GST_CLOCK_TIME_NONE;
650   packetizer->last_pts = GST_CLOCK_TIME_NONE;
651   packetizer->last_dts = GST_CLOCK_TIME_NONE;
652
653   pcrtable = packetizer->observations[packetizer->pcrtablelut[0x1fff]];
654   if (pcrtable)
655     pcrtable->base_time = GST_CLOCK_TIME_NONE;
656
657   /* Close current PCR group */
658   PACKETIZER_GROUP_LOCK (packetizer);
659   for (i = 0; i < MAX_PCR_OBS_CHANNELS; i++) {
660     if (packetizer->observations[i])
661       _close_current_group (packetizer->observations[i]);
662     else
663       break;
664   }
665   PACKETIZER_GROUP_UNLOCK (packetizer);
666
667   if (hard) {
668     /* For pull mode seeks in tsdemux the observation must be preserved */
669     flush_observations (packetizer);
670   }
671 }
672
673 void
674 mpegts_packetizer_remove_stream (MpegTSPacketizer2 * packetizer, gint16 pid)
675 {
676   MpegTSPacketizerStream *stream = packetizer->streams[pid];
677   if (stream) {
678     GST_INFO ("Removing stream for PID 0x%04x", pid);
679     mpegts_packetizer_stream_free (stream);
680     packetizer->streams[pid] = NULL;
681   }
682 }
683
684 MpegTSPacketizer2 *
685 mpegts_packetizer_new (void)
686 {
687   MpegTSPacketizer2 *packetizer;
688
689   packetizer =
690       GST_MPEGTS_PACKETIZER (g_object_new (GST_TYPE_MPEGTS_PACKETIZER, NULL));
691
692   return packetizer;
693 }
694
695 void
696 mpegts_packetizer_push (MpegTSPacketizer2 * packetizer, GstBuffer * buffer)
697 {
698   GstClockTime ts;
699   if (G_UNLIKELY (packetizer->empty)) {
700     packetizer->empty = FALSE;
701     packetizer->offset = GST_BUFFER_OFFSET (buffer);
702   }
703
704   GST_DEBUG ("Pushing %" G_GSIZE_FORMAT " byte from offset %"
705       G_GUINT64_FORMAT, gst_buffer_get_size (buffer),
706       GST_BUFFER_OFFSET (buffer));
707   gst_adapter_push (packetizer->adapter, buffer);
708   /* If the buffer has a valid timestamp, store it - preferring DTS,
709    * which is where upstream arrival times should be stored */
710   ts = GST_BUFFER_DTS_OR_PTS (buffer);
711   if (GST_CLOCK_TIME_IS_VALID (ts))
712     packetizer->last_in_time = ts;
713   packetizer->last_pts = GST_BUFFER_PTS (buffer);
714   packetizer->last_dts = GST_BUFFER_DTS (buffer);
715 }
716
717 static void
718 mpegts_packetizer_flush_bytes (MpegTSPacketizer2 * packetizer, gsize size)
719 {
720   if (size > 0) {
721     GST_LOG ("flushing %" G_GSIZE_FORMAT " bytes from adapter", size);
722     gst_adapter_flush (packetizer->adapter, size);
723   }
724
725   packetizer->map_data = NULL;
726   packetizer->map_size = 0;
727   packetizer->map_offset = 0;
728 }
729
730 static gboolean
731 mpegts_packetizer_map (MpegTSPacketizer2 * packetizer, gsize size)
732 {
733   gsize available;
734
735   if (packetizer->map_size - packetizer->map_offset >= size)
736     return TRUE;
737
738   mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
739
740   available = gst_adapter_available (packetizer->adapter);
741   if (available < size)
742     return FALSE;
743
744   packetizer->map_data =
745       (guint8 *) gst_adapter_map (packetizer->adapter, available);
746   if (!packetizer->map_data)
747     return FALSE;
748
749   packetizer->map_size = available;
750   packetizer->map_offset = 0;
751
752   GST_LOG ("mapped %" G_GSIZE_FORMAT " bytes from adapter", available);
753
754   return TRUE;
755 }
756
757 static gboolean
758 mpegts_try_discover_packet_size (MpegTSPacketizer2 * packetizer)
759 {
760   guint8 *data;
761   gsize size, i, j;
762
763   static const guint psizes[] = {
764     MPEGTS_NORMAL_PACKETSIZE,
765     MPEGTS_M2TS_PACKETSIZE,
766     MPEGTS_DVB_ASI_PACKETSIZE,
767     MPEGTS_ATSC_PACKETSIZE
768   };
769
770   if (!mpegts_packetizer_map (packetizer, 4 * MPEGTS_MAX_PACKETSIZE))
771     return FALSE;
772
773   size = packetizer->map_size - packetizer->map_offset;
774   data = packetizer->map_data + packetizer->map_offset;
775
776   for (i = 0; i + 3 * MPEGTS_MAX_PACKETSIZE < size; i++) {
777     /* find a sync byte */
778     if (data[i] != PACKET_SYNC_BYTE)
779       continue;
780
781     /* check for 4 consecutive sync bytes with each possible packet size */
782     for (j = 0; j < G_N_ELEMENTS (psizes); j++) {
783       guint packet_size = psizes[j];
784
785       if (data[i + packet_size] == PACKET_SYNC_BYTE &&
786           data[i + 2 * packet_size] == PACKET_SYNC_BYTE &&
787           data[i + 3 * packet_size] == PACKET_SYNC_BYTE) {
788         packetizer->packet_size = packet_size;
789         goto out;
790       }
791     }
792   }
793
794 out:
795   packetizer->map_offset += i;
796
797   if (packetizer->packet_size == 0) {
798     GST_DEBUG ("Could not determine packet size in %" G_GSIZE_FORMAT
799         " bytes buffer, flush %" G_GSIZE_FORMAT " bytes", size, i);
800     mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
801     return FALSE;
802   }
803
804   GST_INFO ("have packetsize detected: %u bytes", packetizer->packet_size);
805
806   if (packetizer->packet_size == MPEGTS_M2TS_PACKETSIZE &&
807       packetizer->map_offset >= 4)
808     packetizer->map_offset -= 4;
809
810   return TRUE;
811 }
812
813 static gboolean
814 mpegts_packetizer_sync (MpegTSPacketizer2 * packetizer)
815 {
816   gboolean found = FALSE;
817   guint8 *data;
818   guint packet_size;
819   gsize size, sync_offset, i;
820
821   packet_size = packetizer->packet_size;
822
823   if (!mpegts_packetizer_map (packetizer, 3 * packet_size))
824     return FALSE;
825
826   size = packetizer->map_size - packetizer->map_offset;
827   data = packetizer->map_data + packetizer->map_offset;
828
829   if (packet_size == MPEGTS_M2TS_PACKETSIZE)
830     sync_offset = 4;
831   else
832     sync_offset = 0;
833
834   for (i = sync_offset; i + 2 * packet_size < size; i++) {
835     if (data[i] == PACKET_SYNC_BYTE &&
836         data[i + packet_size] == PACKET_SYNC_BYTE &&
837         data[i + 2 * packet_size] == PACKET_SYNC_BYTE) {
838       found = TRUE;
839       break;
840     }
841   }
842
843   packetizer->map_offset += i - sync_offset;
844
845   if (!found)
846     mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
847
848   return found;
849 }
850
851 MpegTSPacketizerPacketReturn
852 mpegts_packetizer_next_packet (MpegTSPacketizer2 * packetizer,
853     MpegTSPacketizerPacket * packet)
854 {
855   guint8 *packet_data;
856   guint packet_size;
857   gsize sync_offset;
858
859   packet_size = packetizer->packet_size;
860   if (G_UNLIKELY (!packet_size)) {
861     if (!mpegts_try_discover_packet_size (packetizer))
862       return PACKET_NEED_MORE;
863     packet_size = packetizer->packet_size;
864   }
865
866   /* M2TS packets don't start with the sync byte, all other variants do */
867   if (packet_size == MPEGTS_M2TS_PACKETSIZE)
868     sync_offset = 4;
869   else
870     sync_offset = 0;
871
872   while (1) {
873     if (packetizer->need_sync) {
874       if (!mpegts_packetizer_sync (packetizer))
875         return PACKET_NEED_MORE;
876       packetizer->need_sync = FALSE;
877     }
878
879     if (!mpegts_packetizer_map (packetizer, packet_size))
880       return PACKET_NEED_MORE;
881
882     packet_data = &packetizer->map_data[packetizer->map_offset + sync_offset];
883
884     /* Check sync byte */
885     if (G_UNLIKELY (*packet_data != PACKET_SYNC_BYTE)) {
886       GST_DEBUG ("lost sync");
887       packetizer->need_sync = TRUE;
888     } else {
889       /* ALL mpeg-ts variants contain 188 bytes of data. Those with bigger
890        * packet sizes contain either extra data (timesync, FEC, ..) either
891        * before or after the data */
892       packet->data_start = packet_data;
893       packet->data_end = packet->data_start + 188;
894       packet->offset = packetizer->offset;
895       GST_LOG ("offset %" G_GUINT64_FORMAT, packet->offset);
896       packetizer->offset += packet_size;
897       GST_MEMDUMP ("data_start", packet->data_start, 16);
898
899       return mpegts_packetizer_parse_packet (packetizer, packet);
900     }
901   }
902 }
903
904 MpegTSPacketizerPacketReturn
905 mpegts_packetizer_process_next_packet (MpegTSPacketizer2 * packetizer)
906 {
907   MpegTSPacketizerPacket packet;
908   MpegTSPacketizerPacketReturn ret;
909
910   ret = mpegts_packetizer_next_packet (packetizer, &packet);
911   if (ret != PACKET_NEED_MORE)
912     mpegts_packetizer_clear_packet (packetizer, &packet);
913
914   return ret;
915 }
916
917 void
918 mpegts_packetizer_clear_packet (MpegTSPacketizer2 * packetizer,
919     MpegTSPacketizerPacket * packet)
920 {
921   guint8 packet_size = packetizer->packet_size;
922
923   if (packetizer->map_data) {
924     packetizer->map_offset += packet_size;
925     if (packetizer->map_size - packetizer->map_offset < packet_size)
926       mpegts_packetizer_flush_bytes (packetizer, packetizer->map_offset);
927   }
928 }
929
930 gboolean
931 mpegts_packetizer_has_packets (MpegTSPacketizer2 * packetizer)
932 {
933   if (G_UNLIKELY (!packetizer->packet_size)) {
934     if (!mpegts_try_discover_packet_size (packetizer))
935       return FALSE;
936   }
937   return gst_adapter_available (packetizer->adapter) >= packetizer->packet_size;
938 }
939
940 /*
941  * Ideally it should just return a section if:
942  * * The section is complete
943  * * The section is valid (sanity checks for length for example)
944  * * The section applies now (current_next_indicator)
945  * * The section is an update or was never seen
946  *
947  * The section should be a new GstMpegtsSection:
948  * * properly initialized
949  * * With pid, table_id AND section_type set (move logic from mpegtsbase)
950  * * With data copied into it (yes, minor overhead)
951  *
952  * In all other cases it should just return NULL
953  *
954  * If more than one section is available, the 'remaining' field will
955  * be set to the beginning of a valid GList containing other sections.
956  * */
957 GstMpegtsSection *
958 mpegts_packetizer_push_section (MpegTSPacketizer2 * packetizer,
959     MpegTSPacketizerPacket * packet, GList ** remaining)
960 {
961   GstMpegtsSection *section;
962   GstMpegtsSection *res = NULL;
963   MpegTSPacketizerStream *stream;
964   gboolean long_packet;
965   guint8 pointer = 0, table_id;
966   guint16 subtable_extension;
967   gsize to_read;
968   guint section_length;
969   /* data points to the current read location
970    * data_start points to the beginning of the data to accumulate */
971   guint8 *data, *data_start;
972   guint8 packet_cc;
973   GList *others = NULL;
974   guint8 version_number, section_number, last_section_number;
975
976   data = packet->data;
977   packet_cc = FLAGS_CONTINUITY_COUNTER (packet->scram_afc_cc);
978
979   /* Get our filter */
980   stream = packetizer->streams[packet->pid];
981   if (G_UNLIKELY (stream == NULL)) {
982     if (!packet->payload_unit_start_indicator) {
983       /* Early exit (we need to start with a section start) */
984       GST_DEBUG ("PID 0x%04x  waiting for section start", packet->pid);
985       goto out;
986     }
987     stream = mpegts_packetizer_stream_new (packet->pid);
988     packetizer->streams[packet->pid] = stream;
989   }
990
991   GST_MEMDUMP ("Full packet data", packet->data,
992       packet->data_end - packet->data);
993
994   /* This function is split into several parts:
995    *
996    * Pre checks (packet-wide). Determines where we go next
997    * accumulate_data: store data and check if section is complete
998    * section_start: handle beginning of a section, if needed loop back to
999    *                accumulate_data
1000    *
1001    * The trigger that makes the loop stop and return is if:
1002    * 1) We do not have enough data for the current packet
1003    * 2) There is remaining data after a packet which is only made
1004    *    of stuffing bytes (0xff).
1005    *
1006    * Pre-loop checks, related to the whole incoming packet:
1007    *
1008    * If there is a CC-discont:
1009    *  If it is a PUSI, skip the pointer and handle section_start
1010    *  If not a PUSI, reset and return nothing
1011    * If there is not a CC-discont:
1012    *  If it is a PUSI
1013    *    If pointer, accumulate that data and check for complete section
1014    *    (loop)
1015    *  If it is not a PUSI
1016    *    Accumulate the expected data and check for complete section
1017    *    (loop)
1018    *
1019    **/
1020
1021   if (packet->payload_unit_start_indicator) {
1022     pointer = *data++;
1023     /* If the pointer is zero, we're guaranteed to be able to handle it */
1024     if (pointer == 0) {
1025       GST_LOG
1026           ("PID 0x%04x PUSI and pointer == 0, skipping straight to section_start parsing",
1027           packet->pid);
1028       mpegts_packetizer_clear_section (stream);
1029       goto section_start;
1030     }
1031   }
1032
1033   if (stream->continuity_counter == CONTINUITY_UNSET ||
1034       (stream->continuity_counter + 1) % 16 != packet_cc) {
1035     if (stream->continuity_counter != CONTINUITY_UNSET)
1036       GST_WARNING ("PID 0x%04x section discontinuity (%d vs %d)", packet->pid,
1037           stream->continuity_counter, packet_cc);
1038     mpegts_packetizer_clear_section (stream);
1039     /* If not a PUSI, not much we can do */
1040     if (!packet->payload_unit_start_indicator) {
1041       GST_LOG ("PID 0x%04x continuity discont/unset and not PUSI, bailing out",
1042           packet->pid);
1043       goto out;
1044     }
1045     /* If PUSI, skip pointer data and carry on to section start */
1046     data += pointer;
1047     pointer = 0;
1048     GST_LOG ("discont, but PUSI, skipped %d bytes and doing section start",
1049         pointer);
1050     goto section_start;
1051   }
1052
1053   GST_LOG ("Accumulating data from beginning of packet");
1054
1055   data_start = data;
1056
1057 accumulate_data:
1058   /* If not the beginning of a new section, accumulate what we have */
1059   stream->continuity_counter = packet_cc;
1060   to_read = MIN (stream->section_length - stream->section_offset,
1061       packet->data_end - data_start);
1062   memcpy (stream->section_data + stream->section_offset, data_start, to_read);
1063   stream->section_offset += to_read;
1064   /* Point data to after the data we accumulated */
1065   data = data_start + to_read;
1066   GST_DEBUG ("Appending data (need %d, have %d)", stream->section_length,
1067       stream->section_offset);
1068
1069   /* Check if we have enough */
1070   if (stream->section_offset < stream->section_length) {
1071     GST_DEBUG ("PID 0x%04x, section not complete (Got %d, need %d)",
1072         stream->pid, stream->section_offset, stream->section_length);
1073     goto out;
1074   }
1075
1076   /* Small sanity check. We should have collected *exactly* the right amount */
1077   if (G_UNLIKELY (stream->section_offset != stream->section_length))
1078     GST_WARNING ("PID 0x%04x Accumulated too much data (%d vs %d) !",
1079         stream->pid, stream->section_offset, stream->section_length);
1080   GST_DEBUG ("PID 0x%04x Section complete", stream->pid);
1081
1082   if ((section = mpegts_packetizer_parse_section_header (packetizer, stream))) {
1083     if (res)
1084       others = g_list_append (others, section);
1085     else
1086       res = section;
1087   }
1088
1089 section_start:
1090   subtable_extension = 0;
1091   version_number = 0;
1092   last_section_number = 0;
1093   section_number = 0;
1094   table_id = 0;
1095
1096   /* FIXME : We need at least 3 bytes (or 8 for long packets) with current algorithm :(
1097    * We might end up losing sections that start across two packets (srsl...) */
1098   if (data > packet->data_end - 3 || *data == 0xff) {
1099     /* flush stuffing bytes and leave */
1100     mpegts_packetizer_clear_section (stream);
1101     goto out;
1102   }
1103
1104   /* We have more data to process ... */
1105   GST_DEBUG ("PID 0x%04x, More section present in packet (remaining bytes:%"
1106       G_GSIZE_FORMAT ")", stream->pid, (gsize) (packet->data_end - data));
1107   GST_MEMDUMP ("section_start", data, packet->data_end - data);
1108   data_start = data;
1109   /* Beginning of a new section */
1110   /*
1111    * section_syntax_indicator means that the header is of the following format:
1112    * * table_id (8bit)
1113    * * section_syntax_indicator (1bit) == 0
1114    * * reserved/private fields (3bit)
1115    * * section_length (12bit)
1116    * * data (of size section_length)
1117    * * NO CRC !
1118    */
1119   long_packet = data[1] & 0x80;
1120
1121   /* Fast path for short packets */
1122   if (!long_packet) {
1123     /* We can create the section now (function will check for size) */
1124     GST_DEBUG ("Short packet");
1125     section_length = (GST_READ_UINT16_BE (data + 1) & 0xfff) + 3;
1126     /* Only do fast-path if we have enough byte */
1127     if (data + section_length <= packet->data_end) {
1128       if ((section =
1129               gst_mpegts_section_new (packet->pid, g_memdup2 (data,
1130                       section_length), section_length))) {
1131         GST_DEBUG ("PID 0x%04x Short section complete !", packet->pid);
1132         section->offset = packet->offset;
1133         if (res)
1134           others = g_list_append (others, section);
1135         else
1136           res = section;
1137       }
1138       /* Advance reader and potentially read another section */
1139       data += section_length;
1140       if (data < packet->data_end && *data != 0xff)
1141         goto section_start;
1142       /* If not, exit */
1143       goto out;
1144     }
1145     /* We don't have enough bytes to do short section shortcut */
1146   }
1147
1148   /* Beginning of a new section, do as much pre-parsing as possible */
1149   /* table_id                        : 8  bit */
1150   table_id = *data++;
1151
1152   /* section_syntax_indicator        : 1  bit
1153    * other_fields (reserved)         : 3  bit
1154    * section_length                  : 12 bit */
1155   section_length = (GST_READ_UINT16_BE (data) & 0x0FFF) + 3;
1156   data += 2;
1157
1158   if (long_packet) {
1159     /* Do we have enough data for a long packet? */
1160     if (data > packet->data_end - 5)
1161       goto out;
1162
1163     /* subtable_extension (always present, we are in a long section) */
1164     /* subtable extension              : 16 bit */
1165     subtable_extension = GST_READ_UINT16_BE (data);
1166     data += 2;
1167
1168     /* reserved                      : 2  bit
1169      * version_number                : 5  bit
1170      * current_next_indicator        : 1  bit */
1171     /* Bail out now if current_next_indicator == 0 */
1172     if (G_UNLIKELY (!(*data & 0x01))) {
1173       GST_DEBUG
1174           ("PID 0x%04x table_id 0x%02x section does not apply (current_next_indicator == 0)",
1175           packet->pid, table_id);
1176       goto out;
1177     }
1178
1179     version_number = *data++ >> 1 & 0x1f;
1180     /* section_number                : 8  bit */
1181     section_number = *data++;
1182     /* last_section_number                : 8  bit */
1183     last_section_number = *data++;
1184   } else {
1185     subtable_extension = 0;
1186     version_number = 0;
1187     section_number = 0;
1188     last_section_number = 0;
1189   }
1190   GST_DEBUG
1191       ("PID 0x%04x length:%d table_id:0x%02x subtable_extension:0x%04x version_number:%d section_number:%d(last:%d)",
1192       packet->pid, section_length, table_id, subtable_extension, version_number,
1193       section_number, last_section_number);
1194
1195   to_read = MIN (section_length, packet->data_end - data_start);
1196
1197   /* Check as early as possible whether we already saw this section
1198    * i.e. that we saw a subtable with:
1199    * * same subtable_extension (might be zero)
1200    * * same version_number
1201    * * same last_section_number
1202    * * same section_number was seen
1203    */
1204   if (seen_section_before (stream, table_id, subtable_extension,
1205           version_number, section_number, last_section_number, data_start,
1206           to_read)) {
1207     GST_DEBUG
1208         ("PID 0x%04x Already processed table_id:0x%02x subtable_extension:0x%04x, version_number:%d, section_number:%d",
1209         packet->pid, table_id, subtable_extension, version_number,
1210         section_number);
1211     /* skip data and see if we have more sections after */
1212     data = data_start + to_read;
1213     if (data == packet->data_end || *data == 0xff)
1214       goto out;
1215     goto section_start;
1216   }
1217   if (G_UNLIKELY (section_number > last_section_number)) {
1218     GST_WARNING
1219         ("PID 0x%04x corrupted packet (section_number:%d > last_section_number:%d)",
1220         packet->pid, section_number, last_section_number);
1221     goto out;
1222   }
1223
1224
1225   /* Copy over already parsed values */
1226   stream->table_id = table_id;
1227   stream->section_length = section_length;
1228   stream->version_number = version_number;
1229   stream->subtable_extension = subtable_extension;
1230   stream->section_number = section_number;
1231   stream->last_section_number = last_section_number;
1232   stream->offset = packet->offset;
1233
1234   /* Create enough room to store chunks of sections */
1235   stream->section_data = g_malloc (stream->section_length);
1236   stream->section_offset = 0;
1237
1238   /* Finally, accumulate and check if we parsed enough */
1239   goto accumulate_data;
1240
1241 out:
1242   packet->data = data;
1243   *remaining = others;
1244
1245   GST_DEBUG ("result: %p", res);
1246
1247   return res;
1248 }
1249
1250 static void
1251 _init_local (void)
1252 {
1253   GST_DEBUG_CATEGORY_INIT (mpegts_packetizer_debug, "mpegtspacketizer", 0,
1254       "MPEG transport stream parser");
1255 }
1256
1257
1258 static void
1259 mpegts_packetizer_resync (MpegTSPCR * pcr, GstClockTime time,
1260     GstClockTime gstpcrtime, gboolean reset_skew)
1261 {
1262   pcr->base_time = time;
1263   pcr->base_pcrtime = gstpcrtime;
1264   pcr->prev_out_time = GST_CLOCK_TIME_NONE;
1265   pcr->prev_send_diff = GST_CLOCK_TIME_NONE;
1266   if (reset_skew) {
1267     pcr->window_filling = TRUE;
1268     pcr->window_pos = 0;
1269     pcr->window_min = 0;
1270     pcr->window_size = 0;
1271     pcr->skew = 0;
1272   }
1273 }
1274
1275
1276 /* Code mostly copied from -good/gst/rtpmanager/rtpjitterbuffer.c */
1277
1278 /* For the clock skew we use a windowed low point averaging algorithm as can be
1279  * found in Fober, Orlarey and Letz, 2005, "Real Time Clock Skew Estimation
1280  * over Network Delays":
1281  * http://www.grame.fr/Ressources/pub/TR-050601.pdf
1282  * http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1546
1283  *
1284  * The idea is that the jitter is composed of:
1285  *
1286  *  J = N + n
1287  *
1288  *   D   : a constant network delay.
1289  *   n   : random added noise. The noise is concentrated around 0
1290  *
1291  * In the receiver we can track the elapsed time at the sender with:
1292  *
1293  *  send_diff(i) = (Tsi - Ts0);
1294  *
1295  *   Tsi : The time at the sender at packet i
1296  *   Ts0 : The time at the sender at the first packet
1297  *
1298  * This is the difference between the RTP timestamp in the first received packet
1299  * and the current packet.
1300  *
1301  * At the receiver we have to deal with the jitter introduced by the network.
1302  *
1303  *  recv_diff(i) = (Tri - Tr0)
1304  *
1305  *   Tri : The time at the receiver at packet i
1306  *   Tr0 : The time at the receiver at the first packet
1307  *
1308  * Both of these values contain a jitter Ji, a jitter for packet i, so we can
1309  * write:
1310  *
1311  *  recv_diff(i) = (Cri + D + ni) - (Cr0 + D + n0))
1312  *
1313  *    Cri    : The time of the clock at the receiver for packet i
1314  *    D + ni : The jitter when receiving packet i
1315  *
1316  * We see that the network delay is irrelevant here as we can eliminate D:
1317  *
1318  *  recv_diff(i) = (Cri + ni) - (Cr0 + n0))
1319  *
1320  * The drift is now expressed as:
1321  *
1322  *  Drift(i) = recv_diff(i) - send_diff(i);
1323  *
1324  * We now keep the W latest values of Drift and find the minimum (this is the
1325  * one with the lowest network jitter and thus the one which is least affected
1326  * by it). We average this lowest value to smooth out the resulting network skew.
1327  *
1328  * Both the window and the weighting used for averaging influence the accuracy
1329  * of the drift estimation. Finding the correct parameters turns out to be a
1330  * compromise between accuracy and inertia.
1331  *
1332  * We use a 2 second window or up to 512 data points, which is statistically big
1333  * enough to catch spikes (FIXME, detect spikes).
1334  * We also use a rather large weighting factor (125) to smoothly adapt. During
1335  * startup, when filling the window, we use a parabolic weighting factor, the
1336  * more the window is filled, the faster we move to the detected possible skew.
1337  *
1338  * Returns: @time adjusted with the clock skew.
1339  */
1340 static GstClockTime
1341 calculate_skew (MpegTSPacketizer2 * packetizer,
1342     MpegTSPCR * pcr, guint64 pcrtime, GstClockTime time)
1343 {
1344   guint64 send_diff, recv_diff;
1345   gint64 delta;
1346   gint64 old;
1347   gint pos, i;
1348   GstClockTime gstpcrtime, out_time;
1349 #ifndef GST_DISABLE_GST_DEBUG
1350   guint64 slope;
1351 #endif
1352
1353   gstpcrtime = PCRTIME_TO_GSTTIME (pcrtime) + pcr->pcroffset;
1354
1355   /* first time, lock on to time and gstpcrtime */
1356   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pcr->base_time))) {
1357     pcr->base_time = time;
1358     pcr->prev_out_time = GST_CLOCK_TIME_NONE;
1359     GST_DEBUG ("Taking new base time %" GST_TIME_FORMAT, GST_TIME_ARGS (time));
1360   }
1361
1362   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (pcr->base_pcrtime))) {
1363     pcr->base_pcrtime = gstpcrtime;
1364     pcr->prev_send_diff = -1;
1365     GST_DEBUG ("Taking new base pcrtime %" GST_TIME_FORMAT,
1366         GST_TIME_ARGS (gstpcrtime));
1367   }
1368
1369   /* Handle PCR wraparound and resets */
1370   if (GST_CLOCK_TIME_IS_VALID (pcr->last_pcrtime) &&
1371       gstpcrtime < pcr->last_pcrtime) {
1372     if (pcr->last_pcrtime - gstpcrtime > PCR_GST_MAX_VALUE / 2) {
1373       /* PCR wraparound */
1374       GST_DEBUG ("PCR wrap");
1375       pcr->pcroffset += PCR_GST_MAX_VALUE;
1376       gstpcrtime = PCRTIME_TO_GSTTIME (pcrtime) + pcr->pcroffset;
1377       send_diff = gstpcrtime - pcr->base_pcrtime;
1378     } else if (GST_CLOCK_TIME_IS_VALID (time)
1379         && pcr->last_pcrtime - gstpcrtime > 15 * GST_SECOND) {
1380       /* Time jumped backward by > 15 seconds, and we have a timestamp
1381        * to use to close the discont. Assume a reset */
1382       GST_DEBUG ("PCR reset");
1383       /* Calculate PCR we would have expected for the given input time,
1384        * essentially applying the reverse correction process
1385        *
1386        * We want to find the PCR offset to apply
1387        *   pcroffset = (corrected) gstpcrtime - (received) gstpcrtime
1388        *
1389        * send_diff = (corrected) gstpcrtime - pcr->base_pcrtime
1390        * recv_diff = time - pcr->base_time
1391        * out_time = pcr->base_time + send_diff
1392        *
1393        * We are assuming that send_diff == recv_diff
1394        *   (corrected) gstpcrtime - pcr->base_pcrtime = time - pcr->base_time
1395        * Giving us:
1396        *   (corrected) gstpcrtime = time - pcr->base_time + pcr->base_pcrtime
1397        *
1398        * And therefore:
1399        *   pcroffset = time - pcr->base_time + pcr->base_pcrtime - (received) gstpcrtime
1400        **/
1401       pcr->pcroffset += time - pcr->base_time + pcr->base_pcrtime - gstpcrtime;
1402       gstpcrtime = PCRTIME_TO_GSTTIME (pcrtime) + pcr->pcroffset;
1403       send_diff = gstpcrtime - pcr->base_pcrtime;
1404       GST_DEBUG ("Introduced offset is now %" GST_TIME_FORMAT
1405           " corrected pcr time %" GST_TIME_FORMAT,
1406           GST_TIME_ARGS (pcr->pcroffset), GST_TIME_ARGS (gstpcrtime));
1407     } else {
1408       /* Small jumps backward, assume some arrival jitter and skip it */
1409       send_diff = 0;
1410
1411       /* The following code are the different ways we deal with small-ish
1412        * jitter, ranging in severity from "can be ignored" to "this needs a full
1413        * resync" */
1414
1415       if (time == pcr->base_time) {
1416         /* If this comes from a non-fully-timestamped source (i.e. adaptive
1417          * streams), then cope with the fact that some producers generate utter
1418          * PCR garbage on fragment ends.
1419          *
1420          * We detect this comes from a non-fully-timestamped source by the fact
1421          * that the buffer time never changes */
1422         GST_DEBUG ("Ignoring PCR resets on non-fully timestamped stream");
1423       } else if (pcr->last_pcrtime - gstpcrtime < GST_SECOND) {
1424         GST_WARNING
1425             ("(small) backward timestamps at server or no buffer timestamps. Ignoring.");
1426         /* This will trigger the no_skew logic before but leave other state
1427          * intact */
1428         time = GST_CLOCK_TIME_NONE;
1429       } else {
1430         /* A bigger backward step than packet out-of-order can account for. Reset base PCR time
1431          * to be resynched the next time we see a PCR */
1432         GST_WARNING
1433             ("backward timestamps at server or no buffer timestamps. Resync base PCR");
1434         pcr->base_pcrtime = GST_CLOCK_TIME_NONE;
1435       }
1436     }
1437   } else
1438     send_diff = gstpcrtime - pcr->base_pcrtime;
1439
1440   GST_DEBUG ("gstpcr %" GST_TIME_FORMAT ", buftime %" GST_TIME_FORMAT
1441       ", base %" GST_TIME_FORMAT ", send_diff %" GST_TIME_FORMAT,
1442       GST_TIME_ARGS (gstpcrtime), GST_TIME_ARGS (time),
1443       GST_TIME_ARGS (pcr->base_pcrtime), GST_TIME_ARGS (send_diff));
1444
1445   /* keep track of the last extended pcrtime */
1446   pcr->last_pcrtime = gstpcrtime;
1447
1448   /* we don't have an arrival timestamp so we can't do skew detection. we
1449    * should still apply a timestamp based on RTP timestamp and base_time */
1450   if (!GST_CLOCK_TIME_IS_VALID (time)
1451       || !GST_CLOCK_TIME_IS_VALID (pcr->base_time))
1452     goto no_skew;
1453
1454   /* elapsed time at receiver, includes the jitter */
1455   recv_diff = time - pcr->base_time;
1456
1457   /* Ignore packets received at 100% the same time (i.e. from the same input buffer) */
1458   if (G_UNLIKELY (time == pcr->prev_in_time
1459           && GST_CLOCK_TIME_IS_VALID (pcr->prev_in_time)))
1460     goto no_skew;
1461
1462   /* measure the diff */
1463   delta = ((gint64) recv_diff) - ((gint64) send_diff);
1464
1465 #ifndef GST_DISABLE_GST_DEBUG
1466   /* measure the slope, this gives a rought estimate between the sender speed
1467    * and the receiver speed. This should be approximately 8, higher values
1468    * indicate a burst (especially when the connection starts) */
1469   slope = recv_diff > 0 ? (send_diff * 8) / recv_diff : 8;
1470 #endif
1471
1472   GST_DEBUG ("time %" GST_TIME_FORMAT ", base %" GST_TIME_FORMAT
1473       ", recv_diff %" GST_TIME_FORMAT ", slope %" G_GUINT64_FORMAT,
1474       GST_TIME_ARGS (time), GST_TIME_ARGS (pcr->base_time),
1475       GST_TIME_ARGS (recv_diff), slope);
1476
1477   /* if the difference between the sender timeline and the receiver timeline
1478    * changed too quickly we have to resync because the server likely restarted
1479    * its timestamps. */
1480   if (ABS (delta - pcr->skew) > packetizer->pcr_discont_threshold) {
1481     GST_WARNING ("delta - skew: %" GST_STIME_FORMAT " too big, reset skew",
1482         GST_STIME_ARGS (delta - pcr->skew));
1483     mpegts_packetizer_resync (pcr, time, gstpcrtime, TRUE);
1484     send_diff = 0;
1485     delta = 0;
1486   }
1487
1488   pos = pcr->window_pos;
1489
1490   if (G_UNLIKELY (pcr->window_filling)) {
1491     /* we are filling the window */
1492     GST_DEBUG ("filling %d, delta %" G_GINT64_FORMAT, pos, delta);
1493     pcr->window[pos++] = delta;
1494     /* calc the min delta we observed */
1495     if (G_UNLIKELY (pos == 1 || delta < pcr->window_min))
1496       pcr->window_min = delta;
1497
1498     if (G_UNLIKELY (send_diff >= MAX_TIME || pos >= MAX_WINDOW)) {
1499       pcr->window_size = pos;
1500
1501       /* window filled */
1502       GST_DEBUG ("min %" G_GINT64_FORMAT, pcr->window_min);
1503
1504       /* the skew is now the min */
1505       pcr->skew = pcr->window_min;
1506       pcr->window_filling = FALSE;
1507     } else {
1508       gint perc_time, perc_window, perc;
1509
1510       /* figure out how much we filled the window, this depends on the amount of
1511        * time we have or the max number of points we keep. */
1512       perc_time = send_diff * 100 / MAX_TIME;
1513       perc_window = pos * 100 / MAX_WINDOW;
1514       perc = MAX (perc_time, perc_window);
1515
1516       /* make a parabolic function, the closer we get to the MAX, the more value
1517        * we give to the scaling factor of the new value */
1518       perc = perc * perc;
1519
1520       /* quickly go to the min value when we are filling up, slowly when we are
1521        * just starting because we're not sure it's a good value yet. */
1522       pcr->skew =
1523           (perc * pcr->window_min + ((10000 - perc) * pcr->skew)) / 10000;
1524       pcr->window_size = pos + 1;
1525     }
1526   } else {
1527     /* pick old value and store new value. We keep the previous value in order
1528      * to quickly check if the min of the window changed */
1529     old = pcr->window[pos];
1530     pcr->window[pos++] = delta;
1531
1532     if (G_UNLIKELY (delta <= pcr->window_min)) {
1533       /* if the new value we inserted is smaller or equal to the current min,
1534        * it becomes the new min */
1535       pcr->window_min = delta;
1536     } else if (G_UNLIKELY (old == pcr->window_min)) {
1537       gint64 min = G_MAXINT64;
1538
1539       /* if we removed the old min, we have to find a new min */
1540       for (i = 0; i < pcr->window_size; i++) {
1541         /* we found another value equal to the old min, we can stop searching now */
1542         if (pcr->window[i] == old) {
1543           min = old;
1544           break;
1545         }
1546         if (pcr->window[i] < min)
1547           min = pcr->window[i];
1548       }
1549       pcr->window_min = min;
1550     }
1551     /* average the min values */
1552     pcr->skew = (pcr->window_min + (124 * pcr->skew)) / 125;
1553     GST_DEBUG ("delta %" G_GINT64_FORMAT ", new min: %" G_GINT64_FORMAT,
1554         delta, pcr->window_min);
1555   }
1556   /* wrap around in the window */
1557   if (G_UNLIKELY (pos >= pcr->window_size))
1558     pos = 0;
1559
1560   pcr->window_pos = pos;
1561
1562 no_skew:
1563   /* the output time is defined as the base timestamp plus the PCR time
1564    * adjusted for the clock skew .*/
1565   if (pcr->base_time != -1) {
1566     out_time = pcr->base_time + send_diff;
1567     /* skew can be negative and we don't want to make invalid timestamps */
1568     if (pcr->skew < 0 && out_time < -pcr->skew) {
1569       out_time = 0;
1570     } else {
1571       out_time += pcr->skew;
1572     }
1573     /* check if timestamps are not going backwards, we can only check this if we
1574      * have a previous out time and a previous send_diff */
1575     if (G_LIKELY (pcr->prev_out_time != -1 && pcr->prev_send_diff != -1)) {
1576       /* now check for backwards timestamps */
1577       if (G_UNLIKELY (
1578               /* if the server timestamps went up and the out_time backwards */
1579               (send_diff > pcr->prev_send_diff
1580                   && out_time < pcr->prev_out_time) ||
1581               /* if the server timestamps went backwards and the out_time forwards */
1582               (send_diff < pcr->prev_send_diff
1583                   && out_time > pcr->prev_out_time) ||
1584               /* if the server timestamps did not change */
1585               send_diff == pcr->prev_send_diff)) {
1586         GST_DEBUG ("backwards timestamps, using previous time");
1587         out_time = GSTTIME_TO_MPEGTIME (out_time);
1588       }
1589     }
1590   } else {
1591     /* We simply use the pcrtime without applying any skew compensation */
1592     out_time = time;
1593   }
1594
1595   pcr->prev_out_time = out_time;
1596   pcr->prev_in_time = time;
1597   pcr->prev_send_diff = send_diff;
1598
1599   GST_DEBUG ("skew %" G_GINT64_FORMAT ", out %" GST_TIME_FORMAT,
1600       pcr->skew, GST_TIME_ARGS (out_time));
1601
1602   return out_time;
1603 }
1604
1605 static void
1606 _reevaluate_group_pcr_offset (MpegTSPCR * pcrtable, PCROffsetGroup * group)
1607 {
1608   PCROffsetGroup *prev = NULL;
1609 #ifndef GST_DISABLE_GST_DEBUG
1610   PCROffsetGroup *first = pcrtable->groups->data;
1611 #endif
1612   PCROffsetCurrent *current = pcrtable->current;
1613   GList *tmp;
1614
1615   /* Go over all ESTIMATED groups until the target group */
1616   for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
1617     PCROffsetGroup *cur = (PCROffsetGroup *) tmp->data;
1618
1619     /* Skip groups that don't need re-evaluation */
1620     if (!(cur->flags & PCR_GROUP_FLAG_ESTIMATED)) {
1621       GST_DEBUG ("Skipping group %p pcr_offset (currently %" GST_TIME_FORMAT
1622           ")", cur, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1623       prev = cur;
1624       continue;
1625     }
1626
1627     /* This should not happen ! The first group is *always* correct (zero) */
1628     if (G_UNLIKELY (prev == NULL)) {
1629       GST_ERROR ("First PCR Group was not estimated (bug). Setting to zero");
1630       cur->pcr_offset = 0;
1631       cur->flags &= ~PCR_GROUP_FLAG_ESTIMATED;
1632       return;
1633     }
1634
1635     /* Finally do the estimation of this group's PCR offset based on the
1636      * previous group information */
1637
1638     GST_DEBUG ("Re-evaluating group %p pcr_offset (currently %" GST_TIME_FORMAT
1639         ")", group, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1640
1641     GST_DEBUG ("cur->first_pcr:%" GST_TIME_FORMAT " prev->first_pcr:%"
1642         GST_TIME_FORMAT, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->first_pcr)),
1643         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prev->first_pcr)));
1644
1645     if (G_UNLIKELY (cur->first_pcr < prev->first_pcr)) {
1646       guint64 prevbr, lastbr;
1647       guint64 prevpcr;
1648       guint64 prevoffset, lastoffset;
1649
1650       /* Take the previous group pcr_offset and figure out how much to add
1651        * to it for the current group */
1652
1653       /* Right now we do a dumb bitrate estimation
1654        * estimate bitrate (prev - first) : bitrate from the start
1655        * estimate bitrate (prev) : bitrate of previous group
1656        * estimate bitrate (last - first) : bitrate from previous group
1657        *
1658        * We will use raw (non-corrected/non-absolute) PCR values in a first time
1659        * to detect wraparound/resets/gaps...
1660        *
1661        * We will use the corrected/absolute PCR values to calculate
1662        * bitrate and estimate the target group pcr_offset.
1663        * */
1664
1665       /* If the current window estimator is over the previous group, used those
1666        * values as the latest (since they are more recent) */
1667       if (current->group == prev && current->pending[current->last].offset) {
1668         prevoffset =
1669             current->pending[current->last].offset + prev->first_offset;
1670         prevpcr = current->pending[current->last].pcr + prev->first_pcr;
1671         /* prevbr: bitrate(prev) */
1672         prevbr =
1673             gst_util_uint64_scale (PCR_SECOND,
1674             current->pending[current->last].offset,
1675             current->pending[current->last].pcr);
1676         GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %"
1677             GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT,
1678             current->pending[current->last].offset,
1679             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current->last].
1680                     pcr)), prevbr);
1681       } else if (prev->values[prev->last_value].offset) {
1682         prevoffset = prev->values[prev->last_value].offset + prev->first_offset;
1683         prevpcr = prev->values[prev->last_value].pcr + prev->first_pcr;
1684         /* prevbr: bitrate(prev) (FIXME : Cache) */
1685         prevbr =
1686             gst_util_uint64_scale (PCR_SECOND,
1687             prev->values[prev->last_value].offset,
1688             prev->values[prev->last_value].pcr);
1689         GST_DEBUG ("Previous group bitrate (%" G_GUINT64_FORMAT " / %"
1690             GST_TIME_FORMAT ") : %" G_GUINT64_FORMAT,
1691             prev->values[prev->last_value].offset,
1692             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prev->values[prev->last_value].
1693                     pcr)), prevbr);
1694       } else {
1695         GST_DEBUG ("Using overall bitrate");
1696         prevoffset = prev->values[prev->last_value].offset + prev->first_offset;
1697         prevpcr = prev->values[prev->last_value].pcr + prev->first_pcr;
1698         prevbr = gst_util_uint64_scale (PCR_SECOND,
1699             prev->first_offset, prev->pcr_offset);
1700       }
1701       lastoffset = cur->values[cur->last_value].offset + cur->first_offset;
1702
1703       GST_DEBUG ("Offset first:%" G_GUINT64_FORMAT " prev:%" G_GUINT64_FORMAT
1704           " cur:%" G_GUINT64_FORMAT, first->first_offset, prevoffset,
1705           lastoffset);
1706       GST_DEBUG ("PCR first:%" GST_TIME_FORMAT " prev:%" GST_TIME_FORMAT
1707           " cur:%" GST_TIME_FORMAT,
1708           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (first->first_pcr)),
1709           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (prevpcr)),
1710           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->values[cur->last_value].pcr +
1711                   cur->first_pcr)));
1712
1713       if (prevpcr - cur->first_pcr > (PCR_MAX_VALUE * 9 / 10)) {
1714         gfloat diffprev;
1715         guint64 guess_offset;
1716
1717         /* Let's assume there is a PCR wraparound between the previous and current
1718          * group.
1719          * [ prev ]... PCR_MAX | 0 ...[ current ]
1720          * The estimated pcr_offset would therefore be:
1721          * current.first + (PCR_MAX_VALUE - prev.first)
1722          *
1723          * 1) Check if bitrate(prev) would be consistent with bitrate (cur - prev)
1724          */
1725         guess_offset = PCR_MAX_VALUE - prev->first_pcr + cur->first_pcr;
1726         lastbr = gst_util_uint64_scale (PCR_SECOND, lastoffset - prevoffset,
1727             guess_offset + cur->values[cur->last_value].pcr - (prevpcr -
1728                 prev->first_pcr));
1729         GST_DEBUG ("Wraparound prev-cur (guess_offset:%" GST_TIME_FORMAT
1730             ") bitrate:%" G_GUINT64_FORMAT,
1731             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (guess_offset)), lastbr);
1732         diffprev = (float) 100.0 *(ABSDIFF (prevbr, lastbr)) / (float) prevbr;
1733         GST_DEBUG ("Difference with previous bitrate:%f", diffprev);
1734         if (diffprev < 10.0) {
1735           GST_DEBUG ("Difference < 10.0, Setting pcr_offset to %"
1736               G_GUINT64_FORMAT, guess_offset);
1737           cur->pcr_offset = guess_offset;
1738           if (diffprev < 1.0) {
1739             GST_DEBUG ("Difference < 1.0, Removing ESTIMATED flags");
1740             cur->flags &= ~PCR_GROUP_FLAG_ESTIMATED;
1741           }
1742         }
1743         /* Indicate the the previous group is before a wrapover */
1744         prev->flags |= PCR_GROUP_FLAG_WRAPOVER;
1745       } else {
1746         guint64 resetprev;
1747         /* Let's assume there was a PCR reset between the previous and current
1748          * group
1749          * [ prev ] ... x | x - reset ... [ current ]
1750          *
1751          * The estimated pcr_offset would then be
1752          * = current.first - (x - reset) + (x - prev.first) + 100ms (for safety)
1753          * = current.first + reset - prev.first + 100ms (for safety)
1754          */
1755         /* In order to calculate the reset, we estimate what the PCR would have
1756          * been by using prevbr */
1757         /* FIXME : Which bitrate should we use ???  */
1758         GST_DEBUG ("Using prevbr:%" G_GUINT64_FORMAT " and taking offsetdiff:%"
1759             G_GUINT64_FORMAT, prevbr, cur->first_offset - prev->first_offset);
1760         resetprev =
1761             gst_util_uint64_scale (PCR_SECOND,
1762             cur->first_offset - prev->first_offset, prevbr);
1763         GST_DEBUG ("Estimated full PCR for offset %" G_GUINT64_FORMAT
1764             ", using prevbr:%"
1765             GST_TIME_FORMAT, cur->first_offset,
1766             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (resetprev)));
1767         cur->pcr_offset = prev->pcr_offset + resetprev + 100 * PCR_MSECOND;
1768         GST_DEBUG ("Adjusted group PCR_offset to %" GST_TIME_FORMAT,
1769             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1770         /* Indicate the the previous group is before a reset */
1771         prev->flags |= PCR_GROUP_FLAG_RESET;
1772       }
1773     } else {
1774       /* FIXME : Detect gaps if bitrate difference is really too big ? */
1775       cur->pcr_offset = prev->pcr_offset + cur->first_pcr - prev->first_pcr;
1776       GST_DEBUG ("Assuming there is no gap, setting pcr_offset to %"
1777           GST_TIME_FORMAT,
1778           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (cur->pcr_offset)));
1779       /* Remove the reset and wrapover flag (if it was previously there) */
1780       prev->flags &= ~PCR_GROUP_FLAG_RESET;
1781       prev->flags &= ~PCR_GROUP_FLAG_WRAPOVER;
1782     }
1783
1784
1785     /* Remember prev for the next group evaluation */
1786     prev = cur;
1787   }
1788 }
1789
1790 static PCROffsetGroup *
1791 _new_group (guint64 pcr, guint64 offset, guint64 pcr_offset, guint flags)
1792 {
1793   PCROffsetGroup *group = g_slice_new0 (PCROffsetGroup);
1794
1795   GST_DEBUG ("Input PCR %" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT
1796       " pcr_offset:%" G_GUINT64_FORMAT " flags:%d",
1797       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr)), offset, pcr_offset, flags);
1798
1799   group->flags = flags;
1800   group->values = g_new0 (PCROffset, DEFAULT_ALLOCATED_OFFSET);
1801   /* The first pcr/offset diff is always 0/0 */
1802   group->values[0].pcr = group->values[0].offset = 0;
1803   group->nb_allocated = DEFAULT_ALLOCATED_OFFSET;
1804
1805   /* Store the full values */
1806   group->first_pcr = pcr;
1807   group->first_offset = offset;
1808   group->pcr_offset = pcr_offset;
1809
1810   GST_DEBUG ("Created group starting with pcr:%" GST_TIME_FORMAT " offset:%"
1811       G_GUINT64_FORMAT " pcr_offset:%" GST_TIME_FORMAT,
1812       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
1813       group->first_offset,
1814       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
1815
1816   return group;
1817 }
1818
1819 static void
1820 _insert_group_after (MpegTSPCR * pcrtable, PCROffsetGroup * group,
1821     PCROffsetGroup * prev)
1822 {
1823   if (prev == NULL) {
1824     /* First group */
1825     pcrtable->groups = g_list_prepend (pcrtable->groups, group);
1826   } else {
1827     GList *tmp, *toinsert, *prevlist = NULL, *nextlist = NULL;
1828     /* Insert before next and prev */
1829     for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
1830       if (tmp->data == prev) {
1831         prevlist = tmp;
1832         nextlist = tmp->next;
1833         break;
1834       }
1835     }
1836     if (!prevlist) {
1837       /* The non NULL prev given isn't in the list */
1838       GST_WARNING ("Request to insert before a group which isn't in the list");
1839       pcrtable->groups = g_list_prepend (pcrtable->groups, group);
1840     } else {
1841       toinsert = g_list_append (NULL, group);
1842       toinsert->next = nextlist;
1843       toinsert->prev = prevlist;
1844       prevlist->next = toinsert;
1845       if (nextlist)
1846         nextlist->prev = toinsert;
1847     }
1848   }
1849 }
1850
1851 static void
1852 _use_group (MpegTSPCR * pcrtable, PCROffsetGroup * group)
1853 {
1854   PCROffsetCurrent *current = pcrtable->current;
1855
1856   memset (current, 0, sizeof (PCROffsetCurrent));
1857   current->group = group;
1858   current->pending[0] = group->values[group->last_value];
1859   current->last_value = current->pending[0];
1860   current->write = 1;
1861   current->prev = group->values[group->last_value];
1862   current->first_pcr = group->first_pcr;
1863   current->first_offset = group->first_offset;
1864 }
1865
1866 /* Create a new group with the specified values after prev
1867  * Set current to that new group */
1868 static void
1869 _set_current_group (MpegTSPCR * pcrtable,
1870     PCROffsetGroup * prev, guint64 pcr, guint64 offset, gboolean contiguous)
1871 {
1872   PCROffsetGroup *group;
1873   guint flags = 0;
1874   guint64 pcr_offset = 0;
1875
1876   /* Handle wraparound/gap (only if contiguous with previous group) */
1877   if (contiguous) {
1878     guint64 lastpcr = prev->first_pcr + prev->values[prev->last_value].pcr;
1879
1880     /* Set CLOSED flag on previous group and remember pcr_offset */
1881     prev->flags |= PCR_GROUP_FLAG_CLOSED;
1882     pcr_offset = prev->pcr_offset;
1883
1884     /* Wraparound ? */
1885     if (lastpcr > pcr) {
1886       /* In offset-mode, a PCR wraparound is only actually consistent if
1887        * we have a very high confidence (99% right now, might need to change
1888        * later) */
1889       if (lastpcr - pcr > (PCR_MAX_VALUE * 99 / 100)) {
1890         GST_WARNING ("WRAPAROUND detected. diff %" GST_TIME_FORMAT,
1891             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (lastpcr - pcr)));
1892         /* The previous group closed at PCR_MAX_VALUE */
1893         pcr_offset += PCR_MAX_VALUE - prev->first_pcr + pcr;
1894       } else {
1895         GST_WARNING ("RESET detected. diff %" GST_TIME_FORMAT,
1896             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (lastpcr - pcr)));
1897         /* The previous group closed at the raw last_pcr diff (+100ms for safety) */
1898         pcr_offset += prev->values[prev->last_value].pcr + 100 * PCR_MSECOND;
1899       }
1900     } else if (lastpcr < pcr - 500 * PCR_MSECOND) {
1901       GST_WARNING ("GAP detected. diff %" GST_TIME_FORMAT,
1902           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcr - lastpcr)));
1903       /* The previous group closed at the raw last_pcr diff (+500ms for safety) */
1904       pcr_offset += prev->values[prev->last_value].pcr + 500 * PCR_MSECOND;
1905     } else
1906       /* Normal continuation (contiguous in time) */
1907       pcr_offset += pcr - prev->first_pcr;
1908
1909   } else if (prev != NULL)
1910     /* If we are not contiguous and it's not the first group, the pcr_offset
1911      * will be estimated */
1912     flags = PCR_GROUP_FLAG_ESTIMATED;
1913
1914   group = _new_group (pcr, offset, pcr_offset, flags);
1915   _use_group (pcrtable, group);
1916   _insert_group_after (pcrtable, group, prev);
1917   if (!contiguous)
1918     _reevaluate_group_pcr_offset (pcrtable, group);
1919 }
1920
1921 static inline void
1922 _append_group_values (PCROffsetGroup * group, PCROffset pcroffset)
1923 {
1924   /* Only append if new values */
1925   if (group->values[group->last_value].offset == pcroffset.offset &&
1926       group->values[group->last_value].pcr == pcroffset.pcr) {
1927     GST_DEBUG ("Same values, ignoring");
1928   } else {
1929     group->last_value++;
1930     /* Resize values if needed */
1931     if (G_UNLIKELY (group->nb_allocated == group->last_value)) {
1932       group->nb_allocated += DEFAULT_ALLOCATED_OFFSET;
1933       group->values =
1934           g_realloc (group->values, group->nb_allocated * sizeof (PCROffset));
1935     }
1936     group->values[group->last_value] = pcroffset;
1937   }
1938
1939   GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT
1940       " PCR_offset:%" GST_TIME_FORMAT,
1941       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
1942       group->first_offset,
1943       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
1944   GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
1945       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (pcroffset.pcr)), pcroffset.offset);
1946 }
1947
1948 /* Move last values from current (if any) to the current group
1949  * and reset current.
1950  * Note: This does not set the CLOSED flag (since we have no next
1951  * contiguous group) */
1952 static void
1953 _close_current_group (MpegTSPCR * pcrtable)
1954 {
1955   PCROffsetCurrent *current = pcrtable->current;
1956   PCROffsetGroup *group = current->group;
1957
1958   if (group == NULL)
1959     return;
1960   GST_DEBUG ("Closing group and resetting current");
1961
1962   /* Store last values */
1963   _append_group_values (group, current->pending[current->last]);
1964   memset (current, 0, sizeof (PCROffsetCurrent));
1965   /* And re-evaluate all groups */
1966 }
1967
1968 static void
1969 record_pcr (MpegTSPacketizer2 * packetizer, MpegTSPCR * pcrtable,
1970     guint64 pcr, guint64 offset)
1971 {
1972   PCROffsetCurrent *current = pcrtable->current;
1973   gint64 corpcr, coroffset;
1974
1975   packetizer->nb_seen_offsets += 1;
1976
1977   pcrtable->last_pcrtime = PCRTIME_TO_GSTTIME (pcr);
1978   /* FIXME : Invert logic later (probability is higher that we have a
1979    * current estimator) */
1980
1981   /* Check for current */
1982   if (G_UNLIKELY (current->group == NULL)) {
1983     PCROffsetGroup *prev = NULL;
1984     GList *tmp;
1985     /* No current estimator. This happens for the initial value, or after
1986      * discont and flushes. Figure out where we need to record this position.
1987      *
1988      * Possible choices:
1989      * 1) No groups at all:
1990      *    Create a new group with pcr/offset
1991      *    Initialize current to that group
1992      * 2) Entirely within an existing group
1993      *    bail out (FIXME: Make this detection faster)
1994      * 3) Not in any group
1995      *    Create a new group with pcr/offset at the right position
1996      *    Initialize current to that group
1997      */
1998     GST_DEBUG ("No current window estimator, Checking for group to use");
1999     for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
2000       PCROffsetGroup *group = (PCROffsetGroup *) tmp->data;
2001
2002       GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT
2003           " PCR_offset:%" GST_TIME_FORMAT,
2004           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
2005           group->first_offset,
2006           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
2007       GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
2008           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->values[group->last_value].
2009                   pcr)), group->values[group->last_value].offset);
2010       /* Check if before group */
2011       if (offset < group->first_offset) {
2012         GST_DEBUG ("offset is before that group");
2013         break;
2014       }
2015       /* Check if within group */
2016       if (offset <=
2017           (group->values[group->last_value].offset + group->first_offset)) {
2018         GST_DEBUG ("Already observed PCR offset %" G_GUINT64_FORMAT, offset);
2019         return;
2020       }
2021       /* Check if just after group (i.e. continuation of it) */
2022       if (!(group->flags & PCR_GROUP_FLAG_CLOSED) &&
2023           pcr - group->first_pcr - group->values[group->last_value].pcr <=
2024           100 * PCR_MSECOND) {
2025         GST_DEBUG ("Continuation of existing group");
2026         _use_group (pcrtable, group);
2027         return;
2028       }
2029       /* Else after group */
2030       prev = group;
2031     }
2032     _set_current_group (pcrtable, prev, pcr, offset, FALSE);
2033     return;
2034   }
2035
2036   corpcr = pcr - current->first_pcr;
2037   coroffset = offset - current->first_offset;
2038
2039   /* FIXME : Detect if we've gone into the next group !
2040    * FIXME : Close group when that happens */
2041   GST_DEBUG ("first:%d, last:%d, write:%d", current->first, current->last,
2042       current->write);
2043   GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT,
2044       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->first_pcr)),
2045       current->first_offset);
2046   GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
2047       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current->last].pcr)),
2048       current->pending[current->last].offset);
2049   GST_DEBUG ("To add (corrected) PCR:%" GST_TIME_FORMAT " offset:%"
2050       G_GINT64_FORMAT, GST_TIME_ARGS (PCRTIME_TO_GSTTIME (corpcr)), coroffset);
2051
2052   /* Do we need to close the current group ? */
2053   /* Check for wrapover/discont */
2054   if (G_UNLIKELY (corpcr < current->pending[current->last].pcr)) {
2055     /* FIXME : ignore very small deltas (< 500ms ?) which are most likely
2056      * stray values */
2057     GST_DEBUG
2058         ("PCR smaller than previously observed one, handling discont/wrapover");
2059     /* Take values from current and put them in the current group (closing it) */
2060     /* Create new group with new pcr/offset just after the current group
2061      * and mark it as a wrapover */
2062     /* Initialize current to that group with new values */
2063     _append_group_values (current->group, current->pending[current->last]);
2064     _set_current_group (pcrtable, current->group, pcr, offset, TRUE);
2065     return;
2066   }
2067   /* If PCR diff is greater than 500ms, create new group */
2068   if (G_UNLIKELY (corpcr - current->pending[current->last].pcr >
2069           500 * PCR_MSECOND)) {
2070     GST_DEBUG ("New PCR more than 500ms away, handling discont");
2071     /* Take values from current and put them in the current group (closing it) */
2072     /* Create new group with pcr/offset just after the current group
2073      * and mark it as a discont */
2074     /* Initialize current to that group with new values */
2075     _append_group_values (current->group, current->pending[current->last]);
2076     _set_current_group (pcrtable, current->group, pcr, offset, TRUE);
2077     return;
2078   }
2079
2080   if (G_UNLIKELY (corpcr == current->last_value.pcr)) {
2081     GST_DEBUG ("Ignoring same PCR (stream is drunk)");
2082     return;
2083   }
2084
2085   /* update current window */
2086   current->pending[current->write].pcr = corpcr;
2087   current->pending[current->write].offset = coroffset;
2088   current->last_value = current->pending[current->write];
2089   current->last = (current->last + 1) % PCR_BITRATE_NEEDED;
2090   current->write = (current->write + 1) % PCR_BITRATE_NEEDED;
2091
2092   GST_DEBUG ("first:%d, last:%d, write:%d", current->first, current->last,
2093       current->write);
2094   GST_DEBUG ("First PCR:%" GST_TIME_FORMAT " offset:%" G_GUINT64_FORMAT,
2095       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->first_pcr)),
2096       current->first_offset);
2097   GST_DEBUG ("Last PCR: +%" GST_TIME_FORMAT " offset: +%" G_GUINT64_FORMAT,
2098       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (current->pending[current->last].pcr)),
2099       current->pending[current->last].offset);
2100
2101   /* If we haven't stored enough values, bail out */
2102   if (current->write != current->first) {
2103     GST_DEBUG
2104         ("Not enough observations to calculate bitrate (first:%d, last:%d)",
2105         current->first, current->last);
2106     return;
2107   }
2108
2109   /* If we are at least 1s away from reference value AND we have filled our
2110    * window, we can start comparing bitrates */
2111   if (current->pending[current->first].pcr - current->prev.pcr > PCR_SECOND) {
2112     /* Calculate window bitrate */
2113     current->cur_bitrate = gst_util_uint64_scale (PCR_SECOND,
2114         current->pending[current->last].offset -
2115         current->pending[current->first].offset,
2116         current->pending[current->last].pcr -
2117         current->pending[current->first].pcr);
2118     GST_DEBUG ("Current bitrate is now %" G_GUINT64_FORMAT,
2119         current->cur_bitrate);
2120
2121     /* Calculate previous bitrate */
2122     current->prev_bitrate =
2123         gst_util_uint64_scale (PCR_SECOND,
2124         current->pending[current->first].offset - current->prev.offset,
2125         current->pending[current->first].pcr - current->prev.pcr);
2126     GST_DEBUG ("Previous group bitrate now %" G_GUINT64_FORMAT,
2127         current->prev_bitrate);
2128
2129     /* FIXME : Better bitrate changes ? Currently 10% changes */
2130     if (ABSDIFF (current->cur_bitrate,
2131             current->prev_bitrate) * 10 > current->prev_bitrate) {
2132       GST_DEBUG ("Current bitrate changed by more than 10%% (old:%"
2133           G_GUINT64_FORMAT " new:%" G_GUINT64_FORMAT ")", current->prev_bitrate,
2134           current->cur_bitrate);
2135       /* If we detected a change in bitrate, this means that
2136        * d(first - prev) is a different bitrate than d(last - first).
2137        *
2138        * Two conclusions can be made:
2139        * 1) d(first - prev) is a complete bitrate "chain" (values between the
2140        *    reference value and first pending value have consistent bitrate).
2141        * 2) next values (from second pending value onwards) will no longer have
2142        *    the same bitrate.
2143        *
2144        * The question remains as to how long the new bitrate change is going to
2145        * last for (it might be short or longer term). For this we need to restart
2146        * bitrate estimation.
2147        *
2148        * * We move over first to the last value of group (a new chain ends and
2149        *   starts from there)
2150        * * We remember that last group value as our new window reference
2151        * * We restart our window filing from the last observed value
2152        *
2153        * Once our new window is filled we will end up in two different scenarios:
2154        * 1) Either the bitrate change was consistent, and therefore the bitrate
2155        *    will have remained constant over at least 2 window length
2156        * 2) The bitrate change was very short (1 window duration) and we will
2157        *    close that chain and restart again.
2158        * X) And of course if any discont/gaps/wrapover happen in the meantime they
2159        *    will also close the group.
2160        */
2161       _append_group_values (current->group, current->pending[current->first]);
2162       current->prev = current->pending[current->first];
2163       current->first = current->last;
2164       current->write = (current->first + 1) % PCR_BITRATE_NEEDED;
2165       return;
2166     }
2167   }
2168
2169   /* Update read position */
2170   current->first = (current->first + 1) % PCR_BITRATE_NEEDED;
2171 }
2172
2173
2174 /* convert specified offset into stream time */
2175 GstClockTime
2176 mpegts_packetizer_offset_to_ts (MpegTSPacketizer2 * packetizer,
2177     guint64 offset, guint16 pid)
2178 {
2179   PCROffsetGroup *last;
2180   MpegTSPCR *pcrtable;
2181   GList *tmp;
2182   GstClockTime res;
2183   guint64 lastpcr, lastoffset;
2184
2185   GST_DEBUG ("offset %" G_GUINT64_FORMAT, offset);
2186
2187   if (G_UNLIKELY (!packetizer->calculate_offset))
2188     return GST_CLOCK_TIME_NONE;
2189
2190   if (G_UNLIKELY (packetizer->refoffset == -1))
2191     return GST_CLOCK_TIME_NONE;
2192
2193   if (G_UNLIKELY (offset < packetizer->refoffset))
2194     return GST_CLOCK_TIME_NONE;
2195
2196   PACKETIZER_GROUP_LOCK (packetizer);
2197
2198   pcrtable = get_pcr_table (packetizer, pid);
2199
2200   if (g_list_length (pcrtable->groups) < 1) {
2201     PACKETIZER_GROUP_UNLOCK (packetizer);
2202     GST_WARNING ("Not enough observations to return a duration estimate");
2203     return GST_CLOCK_TIME_NONE;
2204   }
2205
2206   if (g_list_length (pcrtable->groups) > 1) {
2207     GST_LOG ("Using last group");
2208
2209     /* FIXME : Refine this later to use neighbouring groups */
2210     tmp = g_list_last (pcrtable->groups);
2211     last = tmp->data;
2212
2213     if (G_UNLIKELY (last->flags & PCR_GROUP_FLAG_ESTIMATED))
2214       _reevaluate_group_pcr_offset (pcrtable, last);
2215
2216     /* lastpcr is the full value in PCR from the first first chunk of data */
2217     lastpcr = last->values[last->last_value].pcr + last->pcr_offset;
2218     /* lastoffset is the full offset from the first chunk of data */
2219     lastoffset =
2220         last->values[last->last_value].offset + last->first_offset -
2221         packetizer->refoffset;
2222   } else {
2223     PCROffsetCurrent *current = pcrtable->current;
2224
2225     if (!current->group) {
2226       PACKETIZER_GROUP_UNLOCK (packetizer);
2227       GST_LOG ("No PCR yet");
2228       return GST_CLOCK_TIME_NONE;
2229     }
2230     /* If doing progressive read, use current */
2231     GST_LOG ("Using current group");
2232     lastpcr = current->group->pcr_offset + current->pending[current->last].pcr;
2233     lastoffset = current->first_offset + current->pending[current->last].offset;
2234   }
2235   GST_DEBUG ("lastpcr:%" GST_TIME_FORMAT " lastoffset:%" G_GUINT64_FORMAT
2236       " refoffset:%" G_GUINT64_FORMAT,
2237       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (lastpcr)), lastoffset,
2238       packetizer->refoffset);
2239
2240   /* Convert byte difference into time difference (and transformed from 27MHz to 1GHz) */
2241   res =
2242       PCRTIME_TO_GSTTIME (gst_util_uint64_scale (offset - packetizer->refoffset,
2243           lastpcr, lastoffset));
2244
2245   PACKETIZER_GROUP_UNLOCK (packetizer);
2246
2247   GST_DEBUG ("Returning timestamp %" GST_TIME_FORMAT " for offset %"
2248       G_GUINT64_FORMAT, GST_TIME_ARGS (res), offset);
2249
2250   return res;
2251 }
2252
2253 /* Input  : local PTS (in GHz units)
2254  * Return : Stream time (in GHz units) */
2255 GstClockTime
2256 mpegts_packetizer_pts_to_ts (MpegTSPacketizer2 * packetizer,
2257     GstClockTime pts, guint16 pcr_pid)
2258 {
2259   GstClockTime res = GST_CLOCK_TIME_NONE;
2260   MpegTSPCR *pcrtable;
2261
2262   PACKETIZER_GROUP_LOCK (packetizer);
2263   pcrtable = get_pcr_table (packetizer, pcr_pid);
2264
2265   if (!GST_CLOCK_TIME_IS_VALID (pcrtable->base_time) && pcr_pid == 0x1fff &&
2266       GST_CLOCK_TIME_IS_VALID (packetizer->last_in_time)) {
2267     pcrtable->base_time = packetizer->last_in_time;
2268     pcrtable->base_pcrtime = pts;
2269   }
2270
2271   /* Use clock skew if present */
2272   if (packetizer->calculate_skew
2273       && GST_CLOCK_TIME_IS_VALID (pcrtable->base_time)) {
2274     GST_DEBUG ("pts %" GST_TIME_FORMAT " base_pcrtime:%" GST_TIME_FORMAT
2275         " base_time:%" GST_TIME_FORMAT " pcroffset:%" GST_TIME_FORMAT,
2276         GST_TIME_ARGS (pts),
2277         GST_TIME_ARGS (pcrtable->base_pcrtime),
2278         GST_TIME_ARGS (pcrtable->base_time),
2279         GST_TIME_ARGS (pcrtable->pcroffset));
2280     res = pts + pcrtable->pcroffset;
2281
2282     /* Don't return anything if we differ too much against last seen PCR */
2283     /* FIXME : Ideally we want to figure out whether we have a wraparound or
2284      * a reset so we can provide actual values.
2285      * That being said, this will only happen for the small interval of time
2286      * where PTS/DTS are wrapping just before we see the first reset/wrap PCR
2287      */
2288     if (G_UNLIKELY (pcr_pid != 0x1fff &&
2289             ABSDIFF (res, pcrtable->last_pcrtime) > 15 * GST_SECOND))
2290       res = GST_CLOCK_TIME_NONE;
2291     else {
2292       GstClockTime tmp = pcrtable->base_time + pcrtable->skew;
2293       if (tmp + res >= pcrtable->base_pcrtime)
2294         res += tmp - pcrtable->base_pcrtime;
2295       else
2296         res = GST_CLOCK_TIME_NONE;
2297     }
2298   } else if (packetizer->calculate_offset && pcrtable->groups) {
2299     gint64 refpcr = G_MAXINT64, refpcroffset;
2300     PCROffsetGroup *group = pcrtable->current->group;
2301
2302     /* Generic calculation:
2303      * Stream Time = PTS - first group PCR + group PCR_offset
2304      *
2305      * In case of wrapover:
2306      * Stream Time = PTS + MAX_PCR - first group PCR + group PCR_offset
2307      * (which we actually do by using first group PCR -= MAX_PCR in order
2308      *  to end up with the same calculation as for non-wrapover) */
2309
2310     if (group) {
2311       /* If we have a current group the value is pretty much guaranteed */
2312       GST_DEBUG ("Using current First PCR:%" GST_TIME_FORMAT " offset:%"
2313           G_GUINT64_FORMAT " PCR_offset:%" GST_TIME_FORMAT,
2314           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
2315           group->first_offset,
2316           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
2317       refpcr = group->first_pcr;
2318       refpcroffset = group->pcr_offset;
2319       if (pts < PCRTIME_TO_GSTTIME (refpcr)) {
2320         /* Only apply wrapover if we're certain it is, and avoid
2321          * returning bogus values if it's a PTS/DTS which is *just*
2322          * before the start of the current group
2323          */
2324         if (PCRTIME_TO_GSTTIME (refpcr) - pts > GST_SECOND) {
2325           pts += PCR_GST_MAX_VALUE;
2326         } else
2327           refpcr = G_MAXINT64;
2328       }
2329     } else {
2330       GList *tmp;
2331       /* Otherwise, find a suitable group */
2332
2333       GST_DEBUG ("Find group for current offset %" G_GUINT64_FORMAT,
2334           packetizer->offset);
2335
2336       for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
2337         PCROffsetGroup *tgroup = tmp->data;
2338         GST_DEBUG ("Trying First PCR:%" GST_TIME_FORMAT " offset:%"
2339             G_GUINT64_FORMAT " PCR_offset:%" GST_TIME_FORMAT,
2340             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->first_pcr)),
2341             tgroup->first_offset,
2342             GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->pcr_offset)));
2343         /* Gone too far ? */
2344         if (tgroup->first_offset > packetizer->offset) {
2345           /* If there isn't a pending reset, use that value */
2346           if (group) {
2347             GST_DEBUG ("PTS is %" GST_TIME_FORMAT " into group",
2348                 GST_TIME_ARGS (pts - PCRTIME_TO_GSTTIME (group->first_pcr)));
2349           }
2350           break;
2351         }
2352         group = tgroup;
2353         /* In that group ? */
2354         if (group->first_offset + group->values[group->last_value].offset >
2355             packetizer->offset) {
2356           GST_DEBUG ("PTS is %" GST_TIME_FORMAT " into group",
2357               GST_TIME_ARGS (pts - PCRTIME_TO_GSTTIME (group->first_pcr)));
2358           break;
2359         }
2360       }
2361       if (group && !(group->flags & PCR_GROUP_FLAG_RESET)) {
2362         GST_DEBUG ("Using group !");
2363         refpcr = group->first_pcr;
2364         refpcroffset = group->pcr_offset;
2365         if (pts < PCRTIME_TO_GSTTIME (refpcr)) {
2366           if (PCRTIME_TO_GSTTIME (refpcr) - pts > GST_SECOND)
2367             pts += PCR_GST_MAX_VALUE;
2368           else
2369             refpcr = G_MAXINT64;
2370         }
2371       }
2372     }
2373     if (refpcr != G_MAXINT64)
2374       res =
2375           pts - PCRTIME_TO_GSTTIME (refpcr) + PCRTIME_TO_GSTTIME (refpcroffset);
2376     else
2377       GST_WARNING ("No groups, can't calculate timestamp");
2378   } else
2379     GST_WARNING ("Not enough information to calculate proper timestamp");
2380
2381   PACKETIZER_GROUP_UNLOCK (packetizer);
2382
2383   GST_DEBUG ("Returning timestamp %" GST_TIME_FORMAT " for pts %"
2384       GST_TIME_FORMAT " pcr_pid:0x%04x", GST_TIME_ARGS (res),
2385       GST_TIME_ARGS (pts), pcr_pid);
2386   return res;
2387 }
2388
2389 /* Stream time to offset */
2390 guint64
2391 mpegts_packetizer_ts_to_offset (MpegTSPacketizer2 * packetizer,
2392     GstClockTime ts, guint16 pcr_pid)
2393 {
2394   MpegTSPCR *pcrtable;
2395   guint64 res;
2396   PCROffsetGroup *nextgroup = NULL, *prevgroup = NULL;
2397   guint64 querypcr, firstpcr, lastpcr, firstoffset, lastoffset;
2398   PCROffsetCurrent *current;
2399   GList *tmp;
2400
2401   if (!packetizer->calculate_offset)
2402     return -1;
2403
2404   PACKETIZER_GROUP_LOCK (packetizer);
2405   pcrtable = get_pcr_table (packetizer, pcr_pid);
2406
2407   if (pcrtable->groups == NULL) {
2408     PACKETIZER_GROUP_UNLOCK (packetizer);
2409     return -1;
2410   }
2411
2412   querypcr = GSTTIME_TO_PCRTIME (ts);
2413
2414   GST_DEBUG ("Searching offset for ts %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2415
2416   /* First check if we're within the current pending group */
2417   current = pcrtable->current;
2418   if (current && current->group && (querypcr >= current->group->pcr_offset) &&
2419       querypcr - current->group->pcr_offset <=
2420       current->pending[current->last].pcr) {
2421     GST_DEBUG ("pcr is in current group");
2422     nextgroup = current->group;
2423     goto calculate_points;
2424   }
2425
2426   /* Find the neighbouring groups */
2427   for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
2428     nextgroup = (PCROffsetGroup *) tmp->data;
2429
2430     GST_DEBUG ("Trying group PCR %" GST_TIME_FORMAT " (offset %"
2431         G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2432         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (nextgroup->first_pcr)),
2433         nextgroup->first_offset,
2434         GST_TIME_ARGS (PCRTIME_TO_GSTTIME (nextgroup->pcr_offset)));
2435
2436     /* Check if we've gone too far */
2437     if (nextgroup->pcr_offset > querypcr) {
2438       GST_DEBUG ("pcr is before that group");
2439       break;
2440     }
2441
2442     if (tmp->next == NULL) {
2443       GST_DEBUG ("pcr is beyond last group");
2444       break;
2445     }
2446
2447     prevgroup = nextgroup;
2448
2449     /* Maybe it's in this group */
2450     if (nextgroup->values[nextgroup->last_value].pcr +
2451         nextgroup->pcr_offset >= querypcr) {
2452       GST_DEBUG ("pcr is in that group");
2453       break;
2454     }
2455   }
2456
2457 calculate_points:
2458
2459   GST_DEBUG ("nextgroup:%p, prevgroup:%p", nextgroup, prevgroup);
2460
2461   if (nextgroup == prevgroup || prevgroup == NULL) {
2462     /* We use the current group to calculate position:
2463      * * if the PCR is within this group
2464      * * if there is only one group to use for calculation
2465      */
2466     GST_DEBUG ("In group or after last one");
2467     lastoffset = firstoffset = nextgroup->first_offset;
2468     lastpcr = firstpcr = nextgroup->pcr_offset;
2469     if (current && nextgroup == current->group) {
2470       lastoffset += current->pending[current->last].offset;
2471       lastpcr += current->pending[current->last].pcr;
2472     } else {
2473       lastoffset += nextgroup->values[nextgroup->last_value].offset;
2474       lastpcr += nextgroup->values[nextgroup->last_value].pcr;
2475     }
2476   } else {
2477     GST_DEBUG ("Between group");
2478     lastoffset = nextgroup->first_offset;
2479     lastpcr = nextgroup->pcr_offset;
2480     firstoffset =
2481         prevgroup->values[prevgroup->last_value].offset +
2482         prevgroup->first_offset;
2483     firstpcr =
2484         prevgroup->values[prevgroup->last_value].pcr + prevgroup->pcr_offset;
2485   }
2486
2487   PACKETIZER_GROUP_UNLOCK (packetizer);
2488
2489   GST_DEBUG ("Using prev PCR %" G_GUINT64_FORMAT " offset %" G_GUINT64_FORMAT,
2490       firstpcr, firstoffset);
2491   GST_DEBUG ("Using last PCR %" G_GUINT64_FORMAT " offset %" G_GUINT64_FORMAT,
2492       lastpcr, lastoffset);
2493
2494   res = firstoffset;
2495   if (lastpcr != firstpcr)
2496     res += gst_util_uint64_scale (querypcr - firstpcr,
2497         lastoffset - firstoffset, lastpcr - firstpcr);
2498
2499   GST_DEBUG ("Returning offset %" G_GUINT64_FORMAT " for ts %"
2500       GST_TIME_FORMAT, res, GST_TIME_ARGS (ts));
2501
2502   return res;
2503 }
2504
2505 void
2506 mpegts_packetizer_set_reference_offset (MpegTSPacketizer2 * packetizer,
2507     guint64 refoffset)
2508 {
2509   GST_DEBUG ("Setting reference offset to %" G_GUINT64_FORMAT, refoffset);
2510
2511   PACKETIZER_GROUP_LOCK (packetizer);
2512   packetizer->refoffset = refoffset;
2513   PACKETIZER_GROUP_UNLOCK (packetizer);
2514 }
2515
2516 void
2517 mpegts_packetizer_set_pcr_discont_threshold (MpegTSPacketizer2 * packetizer,
2518     GstClockTime threshold)
2519 {
2520   PACKETIZER_GROUP_LOCK (packetizer);
2521   packetizer->pcr_discont_threshold = threshold;
2522   PACKETIZER_GROUP_UNLOCK (packetizer);
2523 }
2524
2525 void
2526 mpegts_packetizer_set_current_pcr_offset (MpegTSPacketizer2 * packetizer,
2527     GstClockTime offset, guint16 pcr_pid)
2528 {
2529   guint64 pcr_offset;
2530   gint64 delta;
2531   MpegTSPCR *pcrtable;
2532   PCROffsetGroup *group;
2533   GList *tmp;
2534   gboolean apply = FALSE;
2535
2536   /* fast path */
2537   PACKETIZER_GROUP_LOCK (packetizer);
2538   pcrtable = get_pcr_table (packetizer, pcr_pid);
2539
2540   if (pcrtable == NULL || pcrtable->current->group == NULL) {
2541     PACKETIZER_GROUP_UNLOCK (packetizer);
2542     return;
2543   }
2544
2545   pcr_offset = GSTTIME_TO_PCRTIME (offset);
2546
2547   /* Pick delta from *first* group */
2548   if (pcrtable->groups)
2549     group = pcrtable->groups->data;
2550   else
2551     group = pcrtable->current->group;
2552   GST_DEBUG ("Current group PCR %" GST_TIME_FORMAT " (offset %"
2553       G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2554       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->first_pcr)),
2555       group->first_offset,
2556       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (group->pcr_offset)));
2557
2558   /* Remember the difference between previous initial pcr_offset and
2559    * new initial pcr_offset */
2560   delta = pcr_offset - group->pcr_offset;
2561   if (delta == 0) {
2562     GST_DEBUG ("No shift to apply");
2563     PACKETIZER_GROUP_UNLOCK (packetizer);
2564     return;
2565   }
2566   GST_DEBUG ("Shifting groups by %" GST_TIME_FORMAT
2567       " for new initial pcr_offset %" GST_TIME_FORMAT,
2568       GST_TIME_ARGS (PCRTIME_TO_GSTTIME (delta)), GST_TIME_ARGS (offset));
2569
2570   for (tmp = pcrtable->groups; tmp; tmp = tmp->next) {
2571     PCROffsetGroup *tgroup = (tmp->data);
2572     if (tgroup == group)
2573       apply = TRUE;
2574     if (apply) {
2575       tgroup->pcr_offset += delta;
2576       GST_DEBUG ("Update group PCR %" GST_TIME_FORMAT " (offset %"
2577           G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2578           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->first_pcr)),
2579           tgroup->first_offset,
2580           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->pcr_offset)));
2581     } else
2582       GST_DEBUG ("Not modifying group PCR %" GST_TIME_FORMAT " (offset %"
2583           G_GUINT64_FORMAT " pcr_offset %" GST_TIME_FORMAT,
2584           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->first_pcr)),
2585           tgroup->first_offset,
2586           GST_TIME_ARGS (PCRTIME_TO_GSTTIME (tgroup->pcr_offset)));
2587   }
2588   PACKETIZER_GROUP_UNLOCK (packetizer);
2589 }