1 /* GStreamer RTMP Library
2 * Copyright (C) 2017 Make.TV, Inc. <info@make.tv>
3 * Contact: Jan Alexander Steffens (heftig) <jsteffens@make.tv>
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
18 * Boston, MA 02110-1335, USA.
25 #include "rtmpchunkstream.h"
26 #include "rtmputils.h"
28 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_chunk_stream_debug_category);
29 #define GST_CAT_DEFAULT gst_rtmp_chunk_stream_debug_category
34 static volatile gsize done = 0;
35 if (g_once_init_enter (&done)) {
36 GST_DEBUG_CATEGORY_INIT (gst_rtmp_chunk_stream_debug_category,
37 "rtmpchunkstream", 0, "debug category for rtmp chunk streams");
38 g_once_init_leave (&done, 1);
44 CHUNK_BYTE_TWOBYTE = 0,
45 CHUNK_BYTE_THREEBYTE = 1,
46 CHUNK_BYTE_MASK = 0x3f,
47 CHUNK_STREAM_MIN_TWOBYTE = 0x40,
48 CHUNK_STREAM_MIN_THREEBYTE = 0x140,
49 CHUNK_STREAM_MAX_THREEBYTE = 0x1003f,
60 static const gsize chunk_header_sizes[4] = { 11, 7, 3, 0 };
62 struct _GstRtmpChunkStream
66 GstMapInfo map; /* Only used for parsing */
72 struct _GstRtmpChunkStreams
77 static inline gboolean
78 chunk_stream_is_open (GstRtmpChunkStream * cstream)
80 return cstream->map.data != NULL;
84 chunk_stream_take_buffer (GstRtmpChunkStream * cstream, GstBuffer * buffer)
86 GstRtmpMeta *meta = gst_buffer_get_rtmp_meta (buffer);
88 g_assert (cstream->buffer == NULL);
89 cstream->buffer = buffer;
94 chunk_stream_clear (GstRtmpChunkStream * cstream)
96 if (chunk_stream_is_open (cstream)) {
97 gst_buffer_unmap (cstream->buffer, &cstream->map);
98 cstream->map.data = NULL;
101 gst_buffer_replace (&cstream->buffer, NULL);
102 cstream->meta = NULL;
107 chunk_stream_next_size (GstRtmpChunkStream * cstream, guint32 chunk_size)
109 guint32 size, offset;
111 size = cstream->meta->size;
112 offset = cstream->offset;
114 g_return_val_if_fail (offset <= size, 0);
115 return MIN (size - offset, chunk_size);
118 static inline gboolean
119 needs_ext_ts (GstRtmpMeta * meta)
121 return meta->ts_delta >= 0xffffff;
126 dts_to_abs_ts (GstBuffer * buffer)
128 GstClockTime dts = GST_BUFFER_DTS (buffer);
131 if (GST_CLOCK_TIME_IS_VALID (dts)) {
132 ret = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
135 GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " into abs TS %"
136 G_GUINT32_FORMAT " ms", GST_TIME_ARGS (dts), ret);
141 dts_diff_to_delta_ts (GstBuffer * old_buffer, GstBuffer * buffer,
144 GstClockTime dts = GST_BUFFER_DTS (buffer),
145 old_dts = GST_BUFFER_DTS (old_buffer);
146 guint32 abs_ts, old_abs_ts, delta_32 = 0;
148 if (!GST_CLOCK_TIME_IS_VALID (dts) || !GST_CLOCK_TIME_IS_VALID (old_dts)) {
149 GST_LOG ("Timestamps not valid; using delta TS 0");
153 if (ABS (GST_CLOCK_DIFF (old_dts, dts)) > GST_MSECOND * G_MAXINT32) {
154 GST_WARNING ("Timestamp delta too large: %" GST_TIME_FORMAT " -> %"
155 GST_TIME_FORMAT, GST_TIME_ARGS (old_dts), GST_TIME_ARGS (dts));
159 abs_ts = gst_util_uint64_scale_round (dts, 1, GST_MSECOND);
160 old_abs_ts = gst_util_uint64_scale_round (old_dts, 1, GST_MSECOND);
162 /* underflow wraps around */
163 delta_32 = abs_ts - old_abs_ts;
165 GST_TRACE ("Converted DTS %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT
166 " ms) -> %" GST_TIME_FORMAT " (%" G_GUINT32_FORMAT " ms) into delta TS %"
167 G_GUINT32_FORMAT " ms", GST_TIME_ARGS (old_dts), old_abs_ts,
168 GST_TIME_ARGS (dts), abs_ts, delta_32);
176 select_chunk_type (GstRtmpChunkStream * cstream, GstBuffer * buffer)
178 GstBuffer *old_buffer = cstream->buffer;
179 GstRtmpMeta *meta, *old_meta;
181 g_return_val_if_fail (buffer, -1);
183 meta = gst_buffer_get_rtmp_meta (buffer);
185 g_return_val_if_fail (meta, -1);
186 g_return_val_if_fail (gst_rtmp_message_type_is_valid (meta->type), -1);
188 meta->size = gst_buffer_get_size (buffer);
189 meta->cstream = cstream->id;
192 GST_TRACE ("Picking header 0: no previous header");
193 meta->ts_delta = dts_to_abs_ts (buffer);
197 old_meta = gst_buffer_get_rtmp_meta (old_buffer);
198 g_return_val_if_fail (old_meta, -1);
200 if (old_meta->mstream != meta->mstream) {
201 GST_TRACE ("Picking header 0: stream mismatch; "
202 "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
203 old_meta->mstream, meta->mstream);
204 meta->ts_delta = dts_to_abs_ts (buffer);
208 if (!dts_diff_to_delta_ts (old_buffer, buffer, &meta->ts_delta)) {
209 GST_TRACE ("Picking header 0: timestamp delta overflow");
210 meta->ts_delta = dts_to_abs_ts (buffer);
214 /* now at least type 1 */
216 if (old_meta->type != meta->type) {
217 GST_TRACE ("Picking header 1: type mismatch; want %d got %d",
218 old_meta->type, meta->type);
222 if (old_meta->size != meta->size) {
223 GST_TRACE ("Picking header 1: size mismatch; "
224 "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
225 old_meta->size, meta->size);
229 /* now at least type 2 */
231 if (old_meta->ts_delta != meta->ts_delta) {
232 GST_TRACE ("Picking header 2: timestamp delta mismatch; "
233 "want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT,
234 old_meta->ts_delta, meta->ts_delta);
238 /* now at least type 3 */
240 GST_TRACE ("Picking header 3");
245 serialize_next (GstRtmpChunkStream * cstream, guint32 chunk_size,
248 GstRtmpMeta *meta = cstream->meta;
249 guint8 small_stream_id;
250 gsize header_size = chunk_header_sizes[type], offset;
255 GST_TRACE ("Serializing a chunk of type %d, offset %" G_GUINT32_FORMAT,
256 type, cstream->offset);
258 if (cstream->id < CHUNK_STREAM_MIN_TWOBYTE) {
259 small_stream_id = cstream->id;
261 } else if (cstream->id < CHUNK_STREAM_MIN_THREEBYTE) {
262 small_stream_id = CHUNK_BYTE_TWOBYTE;
265 small_stream_id = CHUNK_BYTE_THREEBYTE;
269 ext_ts = needs_ext_ts (meta);
274 GST_TRACE ("Allocating buffer, header size %" G_GSIZE_FORMAT, header_size);
276 ret = gst_buffer_new_allocate (NULL, header_size, NULL);
278 GST_ERROR ("Failed to allocate chunk buffer");
282 if (!gst_buffer_map (ret, &map, GST_MAP_WRITE)) {
283 GST_ERROR ("Failed to map %" GST_PTR_FORMAT, ret);
284 gst_buffer_unref (ret);
288 /* Chunk Basic Header */
289 GST_WRITE_UINT8 (map.data, (type << 6) | small_stream_id);
292 switch (small_stream_id) {
293 case CHUNK_BYTE_TWOBYTE:
294 GST_WRITE_UINT8 (map.data + 1, cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
298 case CHUNK_BYTE_THREEBYTE:
299 GST_WRITE_UINT16_LE (map.data + 1,
300 cstream->id - CHUNK_STREAM_MIN_TWOBYTE);
307 /* SRSLY: "Message stream ID is stored in little-endian format." */
308 GST_WRITE_UINT32_LE (map.data + offset + 7, meta->mstream);
311 GST_WRITE_UINT24_BE (map.data + offset + 3, meta->size);
312 GST_WRITE_UINT8 (map.data + offset + 6, meta->type);
315 GST_WRITE_UINT24_BE (map.data + offset,
316 ext_ts ? 0xffffff : meta->ts_delta);
319 offset += chunk_header_sizes[type];
322 GST_WRITE_UINT32_BE (map.data + offset, meta->ts_delta);
327 g_assert (offset == header_size);
328 GST_MEMDUMP (">>> chunk header", map.data, offset);
330 gst_buffer_unmap (ret, &map);
332 GST_BUFFER_OFFSET (ret) = GST_BUFFER_OFFSET_IS_VALID (cstream->buffer) ?
333 GST_BUFFER_OFFSET (cstream->buffer) + cstream->offset : cstream->bytes;
334 GST_BUFFER_OFFSET_END (ret) = GST_BUFFER_OFFSET (ret);
336 if (meta->size > 0) {
337 guint32 payload_size = chunk_stream_next_size (cstream, chunk_size);
339 GST_TRACE ("Appending %" G_GUINT32_FORMAT " bytes of payload",
342 gst_buffer_copy_into (ret, cstream->buffer, GST_BUFFER_COPY_MEMORY,
343 cstream->offset, payload_size);
345 GST_BUFFER_OFFSET_END (ret) += payload_size;
346 cstream->offset += payload_size;
347 cstream->bytes += payload_size;
349 GST_TRACE ("Chunk has no payload");
352 gst_rtmp_buffer_dump (ret, ">>> chunk");
358 gst_rtmp_chunk_stream_clear (GstRtmpChunkStream * cstream)
360 g_return_if_fail (cstream);
361 GST_LOG ("Clearing chunk stream %" G_GUINT32_FORMAT, cstream->id);
362 chunk_stream_clear (cstream);
366 gst_rtmp_chunk_stream_parse_id (const guint8 * data, gsize size)
371 GST_TRACE ("Not enough bytes to read ID");
375 ret = GST_READ_UINT8 (data) & CHUNK_BYTE_MASK;
378 case CHUNK_BYTE_TWOBYTE:
380 GST_TRACE ("Not enough bytes to read two-byte ID");
384 ret = GST_READ_UINT8 (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
387 case CHUNK_BYTE_THREEBYTE:
389 GST_TRACE ("Not enough bytes to read three-byte ID");
393 ret = GST_READ_UINT16_LE (data + 1) + CHUNK_STREAM_MIN_TWOBYTE;
397 GST_TRACE ("Parsed chunk stream ID %" G_GUINT32_FORMAT, ret);
402 gst_rtmp_chunk_stream_parse_header (GstRtmpChunkStream * cstream,
403 const guint8 * data, gsize size)
407 const guint8 *message_header;
410 gboolean has_abs_timestamp = FALSE;
412 g_return_val_if_fail (cstream, 0);
413 g_return_val_if_fail (cstream->id == gst_rtmp_chunk_stream_parse_id (data,
416 type = GST_READ_UINT8 (data) >> 6;
417 GST_TRACE ("Parsing chunk stream %" G_GUINT32_FORMAT " header type %d",
420 switch (GST_READ_UINT8 (data) & CHUNK_BYTE_MASK) {
421 case CHUNK_BYTE_TWOBYTE:
424 case CHUNK_BYTE_THREEBYTE:
432 message_header = data + header_size;
433 header_size += chunk_header_sizes[type];
435 if (cstream->buffer) {
436 buffer = cstream->buffer;
437 meta = cstream->meta;
438 g_assert (meta->cstream == cstream->id);
440 buffer = gst_buffer_new ();
441 GST_BUFFER_DTS (buffer) = 0;
442 GST_BUFFER_OFFSET (buffer) = cstream->bytes;
443 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
445 meta = gst_buffer_add_rtmp_meta (buffer);
446 meta->cstream = cstream->id;
448 chunk_stream_take_buffer (cstream, buffer);
449 GST_DEBUG ("Starting parse with new %" GST_PTR_FORMAT, buffer);
452 if (size < header_size) {
453 GST_TRACE ("not enough bytes to read header");
459 has_abs_timestamp = TRUE;
460 /* SRSLY: "Message stream ID is stored in little-endian format." */
461 meta->mstream = GST_READ_UINT32_LE (message_header + 7);
464 meta->type = GST_READ_UINT8 (message_header + 6);
465 meta->size = GST_READ_UINT24_BE (message_header + 3);
468 meta->ts_delta = GST_READ_UINT24_BE (message_header);
471 if (needs_ext_ts (meta)) {
474 if (size < header_size + 4) {
475 GST_TRACE ("not enough bytes to read extended timestamp");
476 return header_size + 4;
479 GST_TRACE ("Reading extended timestamp");
480 timestamp = GST_READ_UINT32_BE (data + header_size);
482 if (type == 3 && meta->ts_delta != timestamp) {
483 GST_WARNING ("Type 3 extended timestamp does not match expected"
484 " timestamp (want %" G_GUINT32_FORMAT " got %" G_GUINT32_FORMAT
485 "); assuming it's not present", meta->ts_delta, timestamp);
487 meta->ts_delta = timestamp;
493 GST_MEMDUMP ("<<< chunk header", data, header_size);
495 if (!chunk_stream_is_open (cstream)) {
496 GstClockTime dts = GST_BUFFER_DTS (buffer);
497 guint32 delta_32, abs_32;
500 if (has_abs_timestamp) {
501 abs_32 = meta->ts_delta;
502 delta_32 = abs_32 - dts / GST_MSECOND;
504 delta_32 = meta->ts_delta;
505 abs_32 = delta_32 + dts / GST_MSECOND;
508 GST_TRACE ("Timestamp delta is %" G_GUINT32_FORMAT " (absolute %"
509 G_GUINT32_FORMAT ")", delta_32, abs_32);
511 /* emulate signed overflow */
513 if (delta_64 > G_MAXINT32) {
514 delta_64 -= G_MAXUINT32;
518 delta_64 *= GST_MSECOND;
520 if (G_LIKELY (delta_64 >= 0)) {
521 /* Normal advancement */
522 } else if (G_LIKELY ((guint64) (-delta_64) <= dts)) {
523 /* In-bounds regression */
524 GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT,
525 GST_STIME_ARGS (delta_64));
527 /* Out-of-bounds regression */
528 GST_WARNING ("Timestamp regression: %" GST_STIME_FORMAT ", offsetting",
529 GST_STIME_ARGS (delta_64));
530 delta_64 = delta_32 * GST_MSECOND;
533 GST_BUFFER_DTS (buffer) += delta_64;
535 GST_TRACE ("Adjusted buffer DTS (%" GST_TIME_FORMAT ") by %"
536 GST_STIME_FORMAT " to %" GST_TIME_FORMAT, GST_TIME_ARGS (dts),
537 GST_STIME_ARGS (delta_64), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
539 GST_TRACE ("Message payload already started; not touching timestamp");
546 gst_rtmp_chunk_stream_parse_payload (GstRtmpChunkStream * cstream,
547 guint32 chunk_size, guint8 ** data)
551 g_return_val_if_fail (cstream, 0);
552 g_return_val_if_fail (cstream->buffer, 0);
554 if (!chunk_stream_is_open (cstream)) {
555 guint32 size = cstream->meta->size;
557 GST_TRACE ("Allocating buffer, payload size %" G_GUINT32_FORMAT, size);
559 mem = gst_allocator_alloc (NULL, size, 0);
561 GST_ERROR ("Failed to allocate buffer for payload size %"
562 G_GUINT32_FORMAT, size);
566 gst_buffer_append_memory (cstream->buffer, mem);
567 gst_buffer_map (cstream->buffer, &cstream->map, GST_MAP_WRITE);
570 g_return_val_if_fail (cstream->map.size == cstream->meta->size, 0);
573 *data = cstream->map.data + cstream->offset;
576 return chunk_stream_next_size (cstream, chunk_size);
580 gst_rtmp_chunk_stream_wrote_payload (GstRtmpChunkStream * cstream,
585 g_return_val_if_fail (cstream, FALSE);
586 g_return_val_if_fail (chunk_stream_is_open (cstream), FALSE);
588 size = chunk_stream_next_size (cstream, chunk_size);
589 cstream->offset += size;
590 cstream->bytes += size;
592 return chunk_stream_next_size (cstream, chunk_size);
596 gst_rtmp_chunk_stream_parse_finish (GstRtmpChunkStream * cstream)
598 GstBuffer *buffer, *empty;
600 g_return_val_if_fail (cstream, NULL);
601 g_return_val_if_fail (cstream->buffer, NULL);
603 buffer = gst_buffer_ref (cstream->buffer);
604 GST_BUFFER_OFFSET_END (buffer) = cstream->bytes;
606 gst_rtmp_buffer_dump (buffer, "<<< message");
608 chunk_stream_clear (cstream);
610 empty = gst_buffer_new ();
612 if (!gst_buffer_copy_into (empty, buffer, GST_BUFFER_COPY_META, 0, 0)) {
613 GST_ERROR ("copy_into failed");
617 GST_BUFFER_DTS (empty) = GST_BUFFER_DTS (buffer);
618 GST_BUFFER_OFFSET (empty) = GST_BUFFER_OFFSET_END (buffer);
620 chunk_stream_take_buffer (cstream, empty);
626 gst_rtmp_chunk_stream_serialize_start (GstRtmpChunkStream * cstream,
627 GstBuffer * buffer, guint32 chunk_size)
631 g_return_val_if_fail (cstream, NULL);
632 g_return_val_if_fail (GST_IS_BUFFER (buffer), NULL);
634 type = select_chunk_type (cstream, buffer);
635 g_return_val_if_fail (type >= 0, NULL);
637 GST_TRACE ("Starting serialization of message %" GST_PTR_FORMAT
638 " into stream %" G_GUINT32_FORMAT, buffer, cstream->id);
640 gst_rtmp_buffer_dump (buffer, ">>> message");
642 chunk_stream_clear (cstream);
643 chunk_stream_take_buffer (cstream, buffer);
645 return serialize_next (cstream, chunk_size, type);
649 gst_rtmp_chunk_stream_serialize_next (GstRtmpChunkStream * cstream,
652 g_return_val_if_fail (cstream, NULL);
653 g_return_val_if_fail (cstream->buffer, NULL);
655 if (chunk_stream_next_size (cstream, chunk_size) == 0) {
656 GST_TRACE ("Message serialization finished");
660 GST_TRACE ("Continuing serialization of message %" GST_PTR_FORMAT
661 " into stream %" G_GUINT32_FORMAT, cstream->buffer, cstream->id);
663 return serialize_next (cstream, chunk_size, CHUNK_TYPE_3);
666 GstRtmpChunkStreams *
667 gst_rtmp_chunk_streams_new (void)
669 GstRtmpChunkStreams *cstreams;
673 cstreams = g_slice_new (GstRtmpChunkStreams);
674 cstreams->array = g_array_new (FALSE, TRUE, sizeof (GstRtmpChunkStream));
675 g_array_set_clear_func (cstreams->array,
676 (GDestroyNotify) gst_rtmp_chunk_stream_clear);
681 gst_rtmp_chunk_streams_free (gpointer ptr)
683 GstRtmpChunkStreams *cstreams = ptr;
684 g_clear_pointer (&cstreams->array, g_array_unref);
685 g_slice_free (GstRtmpChunkStreams, cstreams);
689 gst_rtmp_chunk_streams_get (GstRtmpChunkStreams * cstreams, guint32 id)
692 GstRtmpChunkStream *entry;
695 g_return_val_if_fail (cstreams, NULL);
696 g_return_val_if_fail (id > CHUNK_BYTE_THREEBYTE, NULL);
697 g_return_val_if_fail (id <= CHUNK_STREAM_MAX_THREEBYTE, NULL);
699 array = cstreams->array;
701 for (i = 0; i < array->len; i++) {
702 entry = &g_array_index (array, GstRtmpChunkStream, i);
703 if (entry->id == id) {
704 GST_TRACE ("Obtaining chunk stream %" G_GUINT32_FORMAT, id);
709 GST_DEBUG ("Allocating chunk stream %" G_GUINT32_FORMAT, id);
711 g_array_set_size (array, i + 1);
712 entry = &g_array_index (array, GstRtmpChunkStream, i);