From 8cf50dbc441688f6b823b9ccc5ec57869e6344ef Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Wed, 24 Feb 2016 16:57:16 +1100 Subject: [PATCH] hlsdemux: Do some reading and dumping of PCRs Read PCRs out of the MPEG-TS stream in a basic way and (for now) just log them --- ext/hls/Makefile.am | 1 + ext/hls/gsthlsdemux-util.c | 212 +++++++++++++++++++++++++++++++++++++ ext/hls/gsthlsdemux.c | 50 ++++++++- ext/hls/gsthlsdemux.h | 25 ++++- 4 files changed, 281 insertions(+), 7 deletions(-) create mode 100644 ext/hls/gsthlsdemux-util.c diff --git a/ext/hls/Makefile.am b/ext/hls/Makefile.am index 189f9e6e8..7aab9d882 100644 --- a/ext/hls/Makefile.am +++ b/ext/hls/Makefile.am @@ -4,6 +4,7 @@ plugin_LTLIBRARIES = libgsthls.la libgsthls_la_SOURCES = \ m3u8.c \ gsthlsdemux.c \ + gsthlsdemux-util.c \ gsthlsplugin.c \ gsthlssink.c \ gstm3u8playlist.c diff --git a/ext/hls/gsthlsdemux-util.c b/ext/hls/gsthlsdemux-util.c new file mode 100644 index 000000000..39bb75f41 --- /dev/null +++ b/ext/hls/gsthlsdemux-util.c @@ -0,0 +1,212 @@ +#include +#include + +#include "gsthlsdemux.h" + +GST_DEBUG_CATEGORY_EXTERN (gst_hls_demux_debug); +#define GST_CAT_DEFAULT gst_hls_demux_debug + +/* Check for sync byte, error_indicator == 0 and packet has payload. + * Adaptation control field (data[3] & 0x30) may be zero for TS packets with + * null PIDs. Still, these streams are valid TS streams (for null packets, + * AFC is supposed to be 0x1, but the spec also says decoders should just + * discard any packets with AFC = 0x00) */ +#define IS_MPEGTS_HEADER(data) (data[0] == 0x47 && \ + (data[1] & 0x80) == 0x00 && \ + ((data[3] & 0x30) != 0x00 || \ + ((data[3] & 0x30) == 0x00 && (data[1] & 0x1f) == 0x1f && (data[2] & 0xff) == 0xff))) + +static gboolean +have_ts_sync (const guint8 * data, guint size, guint packet_size, guint num) +{ + while (num-- > 0) { + if (size < packet_size) + return FALSE; + if (!IS_MPEGTS_HEADER (data)) + return FALSE; + data += packet_size; + size -= packet_size; + } + return TRUE; +} + +static gint +find_offset (GstHLSTSReader * r, const guint8 * data, guint size) +{ + guint sync_points = CLAMP (size / 188, 25, 100); + guint off; + const gint packet_size = 188; + + /* FIXME: check 192 as well, and maybe also 204, 208 */ + for (off = 0; off < MIN (size, packet_size); ++off) { + if (have_ts_sync (data + off, size - off, packet_size, sync_points)) { + r->packet_size = packet_size; + return off; + } + } + return -1; +} + +static gboolean +handle_pcr (GstHLSTSReader * r, const guint8 * data, guint size) +{ + const guint8 *p = data; + guint32 hdr = GST_READ_UINT32_BE (p); + guint af_len, flags; + + guint64 pcr_base, pcr_ext, pcr, ts; + + data = p + 4; + if ((hdr & 0x00000020) == 0) /* has_adaptation_field */ + return FALSE; + af_len = p[4]; /* adaptation_field_len */ + ++data; + if (af_len < (1 + 6) || af_len > r->packet_size - (4 + 1)) + return FALSE; + flags = data[0]; + /* Does the packet have a PCR? */ + if ((flags & 0x10) == 0) + return FALSE; + ++data; + --af_len; + pcr_base = (GST_READ_UINT64_BE (data) >> 16) >> (6 + 9); + pcr_ext = (GST_READ_UINT64_BE (data) >> 16) & 0x1ff; + pcr = pcr_base * 300 + pcr_ext; +#define PCRTIME_TO_GSTTIME(t) (((t) * (guint64)1000) / 27) + ts = PCRTIME_TO_GSTTIME (pcr); + GST_LOG ("have PCR! %" G_GUINT64_FORMAT "\t%" GST_TIME_FORMAT, + pcr, GST_TIME_ARGS (ts)); + if (r->first_pcr == GST_CLOCK_TIME_NONE) + r->first_pcr = ts; + r->last_pcr = ts; + + return TRUE; +} + +static gboolean +handle_pmt (GstHLSTSReader * r, const guint8 * data, guint size) +{ + const guint8 *p = data; + guint32 hdr = GST_READ_UINT32_BE (p); + guint slen, pcr_pid; + + data = p + 4; + if ((hdr & 0x00000020) != 0) /* has_adaptation_field */ + data += 1 + p[4]; /* adaptation_field_len */ + data += 1 + data[0]; /* pointer_field */ + if (data[0] != 0x02) /* table_id */ + return FALSE; + //gst_util_dump_mem (data, 8); + /* we assume the entire PMT fits into a single packet and this is it */ + if (data[6] != 0 || data[6] != data[7]) + return FALSE; + slen = GST_READ_UINT16_BE (data + 1) & 0x0FFF; + if (slen > (gsize) (p + r->packet_size - (data + 1 + 2)) || slen < 5 + 2 + 4) + return FALSE; + data += 3 + 5; + slen -= 5; /* bytes after section_length field itself */ + slen -= 4; /* crc at end */ + pcr_pid = GST_READ_UINT16_BE (data) & 0x1fff; + if (pcr_pid != 0x1fff) { + GST_DEBUG ("pcr_pid now: %04x", pcr_pid); + r->pcr_pid = pcr_pid; + return TRUE; + } + + return FALSE; +} + +static gboolean +handle_pat (GstHLSTSReader * r, const guint8 * data, guint size) +{ + const guint8 *p = data; + guint32 hdr = GST_READ_UINT32_BE (p); + guint slen; + + data = p + 4; + if ((hdr & 0x00000020) != 0) /* has_adaptation_field */ + data += 1 + p[4]; /* adaptation_field_len */ + data += 1 + data[0]; /* pointer_field */ + if (data[0] != 0) /* table_id */ + return FALSE; + /* we assume the entire PAT fits into a single packet and this is it */ + if (data[6] != 0 || data[6] != data[7]) + return FALSE; + slen = GST_READ_UINT16_BE (data + 1) & 0x0FFF; + if (slen > (gsize) (p + r->packet_size - (data + 1 + 2)) || slen < 5 + 4 + 4) + return FALSE; + data += 3 + 5; + slen -= 5; /* bytes after section_length field itself */ + slen -= 4; /* crc at end */ + while (slen >= 4) { + guint program_num = GST_READ_UINT16_BE (data); + guint val = GST_READ_UINT16_BE (data + 2) & 0x1fff; + if (program_num != 0) { + GST_DEBUG (" program %04x: pmt_pid : %04x\n", program_num, val); + r->pmt_pid = val; + return TRUE; + } + data += 4; + slen -= 4; + } + + return FALSE; +} + +void +gst_hlsdemux_tsreader_init (GstHLSTSReader * r) +{ + r->packet_size = 188; + r->pmt_pid = r->pcr_pid = -1; + r->first_pcr = GST_CLOCK_TIME_NONE; + r->last_pcr = GST_CLOCK_TIME_NONE; +} + +gboolean +gst_hlsdemux_tsreader_find_pcrs (GstHLSTSReader * r, + const guint8 * data, guint size, GstClockTime * first_pcr, + GstClockTime * last_pcr) +{ + gint offset; + const guint8 *p; + + *first_pcr = *last_pcr = GST_CLOCK_TIME_NONE; + + offset = find_offset (r, data, size); + if (offset < 0) + return FALSE; + + GST_LOG ("TS packet start offset: %d", offset); + + data += offset; + size -= offset; + + for (p = data; size >= r->packet_size; + p += r->packet_size, size -= r->packet_size) { + guint32 hdr = GST_READ_UINT32_BE (p); + + /* sync byte (0x47), error indicator (TEI) not set, PID 0, has_payload */ + if ((hdr & 0xFF9FFF10) == 0x47000010) { + GST_LOG ("Found packet for PID 0000 (PAT)"); + handle_pat (r, p, size); + } + /* sync byte (0x47), error indicator (TEI) not set, has_payload, PID = PMT_pid */ + else if ((hdr & 0xFF800010) == 0x47000010 + && ((hdr >> 8) & 0x1fff) == r->pmt_pid) { + GST_LOG ("Found packet for PID %04x (PMT)", r->pmt_pid); + handle_pmt (r, p, size); + } + /* sync byte (0x47), error indicator (TEI) not set, has_payload */ + else if ((hdr & 0xFF800010) == 0x47000010 + && ((hdr >> 8) & 0x1fff) == r->pcr_pid) { + GST_LOG ("Found packet for PID %04x (PCR)", r->pcr_pid); + handle_pcr (r, p, size); + } + } + + *first_pcr = r->first_pcr; + *last_pcr = r->last_pcr; + + /* Return TRUE if this piece was big enough to get a PCR from */ + return (r->first_pcr != GST_CLOCK_TIME_NONE); +} diff --git a/ext/hls/gsthlsdemux.c b/ext/hls/gsthlsdemux.c index 3b962231d..ed52b8e25 100644 --- a/ext/hls/gsthlsdemux.c +++ b/ext/hls/gsthlsdemux.c @@ -55,7 +55,7 @@ static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-hls")); -GST_DEBUG_CATEGORY_STATIC (gst_hls_demux_debug); +GST_DEBUG_CATEGORY (gst_hls_demux_debug); #define GST_CAT_DEFAULT gst_hls_demux_debug #define GST_M3U8_CLIENT_LOCK(l) /* FIXME */ @@ -248,6 +248,7 @@ gst_hls_demux_stream_clear_pending_data (GstHLSDemuxStream * hls_stream) gst_adapter_clear (hls_stream->pending_encrypted_data); gst_buffer_replace (&hls_stream->pending_decrypted_buffer, NULL); gst_buffer_replace (&hls_stream->pending_typefind_buffer, NULL); + gst_buffer_replace (&hls_stream->pending_pcr_buffer, NULL); hls_stream->current_offset = -1; gst_hls_demux_stream_decrypt_end (hls_stream); } @@ -695,6 +696,9 @@ gst_hls_demux_start_fragment (GstAdaptiveDemux * demux, gst_hls_demux_stream_clear_pending_data (hls_stream); + /* Init the MPEG-TS reader for this fragment */ + gst_hlsdemux_tsreader_init (&hls_stream->tsreader); + /* If no decryption is needed, there's nothing to be done here */ if (hls_stream->current_key == NULL) return TRUE; @@ -727,13 +731,16 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, { GstHLSDemuxStream *hls_stream = GST_HLS_DEMUX_STREAM_CAST (stream); // FIXME: pass HlsStream into function GstHLSDemux *hlsdemux = GST_HLS_DEMUX_CAST (demux); + GstMapInfo info; + GstClockTime first_pcr, last_pcr; if (buffer == NULL) return GST_FLOW_OK; + gst_buffer_map (buffer, &info, GST_MAP_READ); + if (G_UNLIKELY (hls_stream->do_typefind)) { GstCaps *caps = NULL; - GstMapInfo info; guint buffer_size; GstTypeFindProbability prob = GST_TYPE_FIND_NONE; @@ -751,9 +758,11 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, gst_type_find_helper_for_data (GST_OBJECT_CAST (hlsdemux), info.data, info.size, &prob); } - gst_buffer_unmap (buffer, &info); if (G_UNLIKELY (!caps)) { + /* Won't need this mapping any more all paths return inside this if() */ + gst_buffer_unmap (buffer, &info); + /* Only fail typefinding if we already a good amount of data * and we still don't know the type */ if (buffer_size > (2 * 1024 * 1024) || at_eos) { @@ -776,6 +785,25 @@ gst_hls_demux_handle_buffer (GstAdaptiveDemux * demux, } g_assert (hls_stream->pending_typefind_buffer == NULL); + // Accumulate this buffer + if (hls_stream->pending_pcr_buffer) { + gst_buffer_unmap (buffer, &info); + buffer = gst_buffer_append (hls_stream->pending_pcr_buffer, buffer); + hls_stream->pending_pcr_buffer = NULL; + gst_buffer_map (buffer, &info, GST_MAP_READ); + } + + if (!gst_hlsdemux_tsreader_find_pcrs (&hls_stream->tsreader, info.data, + info.size, &first_pcr, &last_pcr) + && !at_eos) { + gst_buffer_unmap (buffer, &info); + // Store this buffer for later + hls_stream->pending_pcr_buffer = buffer; + return GST_FLOW_OK; + } + + gst_buffer_unmap (buffer, &info); + if (buffer) { buffer = gst_buffer_make_writable (buffer); GST_BUFFER_OFFSET (buffer) = hls_stream->current_offset; @@ -818,6 +846,21 @@ gst_hls_demux_finish_fragment (GstAdaptiveDemux * demux, hls_stream->pending_decrypted_buffer = NULL; } } + + if (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED) { + if (hls_stream->pending_pcr_buffer) { + GstBuffer *buf = hls_stream->pending_pcr_buffer; + hls_stream->pending_pcr_buffer = NULL; + + ret = gst_hls_demux_handle_buffer (demux, stream, buf, TRUE); + } + + GST_LOG_OBJECT (stream, + "Fragment PCRs were %" GST_TIME_FORMAT " to %" GST_TIME_FORMAT, + GST_TIME_ARGS (hls_stream->tsreader.first_pcr), + GST_TIME_ARGS (hls_stream->tsreader.last_pcr)); + } + gst_hls_demux_stream_clear_pending_data (hls_stream); if (ret == GST_FLOW_OK || ret == GST_FLOW_NOT_LINKED) @@ -888,6 +931,7 @@ gst_hls_demux_stream_free (GstAdaptiveDemuxStream * stream) gst_buffer_replace (&hls_stream->pending_decrypted_buffer, NULL); gst_buffer_replace (&hls_stream->pending_typefind_buffer, NULL); + gst_buffer_replace (&hls_stream->pending_pcr_buffer, NULL); if (hls_stream->current_key) { g_free (hls_stream->current_key); diff --git a/ext/hls/gsthlsdemux.h b/ext/hls/gsthlsdemux.h index c2dce4f39..ee00f0e1f 100644 --- a/ext/hls/gsthlsdemux.h +++ b/ext/hls/gsthlsdemux.h @@ -58,9 +58,20 @@ G_BEGIN_DECLS typedef struct _GstHLSDemux GstHLSDemux; typedef struct _GstHLSDemuxClass GstHLSDemuxClass; typedef struct _GstHLSDemuxStream GstHLSDemuxStream; +typedef struct _GstHLSTSReader GstHLSTSReader; #define GST_HLS_DEMUX_STREAM_CAST(stream) ((GstHLSDemuxStream *)(stream)) +struct _GstHLSTSReader +{ + gint packet_size; + gint pmt_pid; + gint pcr_pid; + + GstClockTime last_pcr; + GstClockTime first_pcr; +}; + struct _GstHLSDemuxStream { GstAdaptiveDemux adaptive_demux_stream; @@ -89,10 +100,12 @@ struct _GstHLSDemuxStream gchar *current_key; guint8 *current_iv; - GstBuffer *pending_buffer; /* decryption scenario: - * the last buffer can only be pushed when - * resized, so need to store and wait for - * EOS to know it is the last */ + /* Accumulator for reading PAT/PMT/PCR from + * the stream so we can set timestamps/segments + * and switch cleanly */ + GstBuffer *pending_pcr_buffer; + + GstHLSTSReader tsreader; }; typedef struct { @@ -126,6 +139,10 @@ struct _GstHLSDemuxClass GstAdaptiveDemuxClass parent_class; }; +void gst_hlsdemux_tsreader_init (GstHLSTSReader *r); +gboolean gst_hlsdemux_tsreader_find_pcrs (GstHLSTSReader *r, const guint8 * data, guint size, + GstClockTime *first_pcr, GstClockTime *last_pcr); + GType gst_hls_demux_get_type (void); G_END_DECLS -- 2.34.1