From 206021e4d4985d6e9d60901bd5cc69628775afc6 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Thu, 9 Sep 2021 23:43:33 +1000 Subject: [PATCH] rtpmanager/rtx: implement initial support for reading/writing rid extensions Two RTP Header extensions are very relevant for rtprtxsend/receive. 1. "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will always be removed 2. "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be written instead of the "rtp-stream-id" header extension. Currently it's only a simple replacement of one header extension for another however a future change would only add the relevant extension based on some heuristics (like, video frames only on one of the rtp key frame buffers, or only until the rtx ssrc has been validated by the peer) in order to reduce the required bandwidth. Part-of: --- .../gst-plugins-good/docs/gst_plugins_cache.json | 42 +++- .../gst/rtpmanager/gstrtprtxreceive.c | 257 ++++++++++++++++++++- .../gst/rtpmanager/gstrtprtxreceive.h | 7 +- .../gst/rtpmanager/gstrtprtxsend.c | 249 +++++++++++++++++++- .../gst/rtpmanager/gstrtprtxsend.h | 8 +- .../gst-plugins-good/tests/check/elements/rtprtx.c | 143 +++++++++++- 6 files changed, 690 insertions(+), 16 deletions(-) diff --git a/subprojects/gst-plugins-good/docs/gst_plugins_cache.json b/subprojects/gst-plugins-good/docs/gst_plugins_cache.json index 4e3ee27..721de61 100644 --- a/subprojects/gst-plugins-good/docs/gst_plugins_cache.json +++ b/subprojects/gst-plugins-good/docs/gst_plugins_cache.json @@ -18375,7 +18375,26 @@ "writable": true } }, - "rank": "none" + "rank": "none", + "signals": { + "add-extension": { + "action": true, + "args": [ + { + "name": "arg0", + "type": "GstRTPHeaderExtension" + } + ], + "return-type": "void", + "when": "last" + }, + "clear-extensions": { + "action": true, + "args": [], + "return-type": "void", + "when": "last" + } + } }, "rtprtxsend": { "author": "Julien Isorce ", @@ -18492,7 +18511,26 @@ "writable": true } }, - "rank": "none" + "rank": "none", + "signals": { + "add-extension": { + "action": true, + "args": [ + { + "name": "arg0", + "type": "GstRTPHeaderExtension" + } + ], + "return-type": "void", + "when": "last" + }, + "clear-extensions": { + "action": true, + "args": [], + "return-type": "void", + "when": "last" + } + } }, "rtpsession": { "author": "Wim Taymans ", diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.c b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.c index cfcedca..2dcd2f8 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.c +++ b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.c @@ -145,7 +145,7 @@ #endif #include -#include +#include #include #include @@ -166,6 +166,19 @@ enum PROP_NUM_RTX_ASSOC_PACKETS }; +enum +{ + SIGNAL_0, + SIGNAL_ADD_EXTENSION, + SIGNAL_CLEAR_EXTENSIONS, + LAST_SIGNAL +}; + +static guint gst_rtp_rtx_receive_signals[LAST_SIGNAL] = { 0, }; + +#define RTPHDREXT_STREAM_ID GST_RTP_HDREXT_BASE "sdes:rtp-stream-id" +#define RTPHDREXT_REPAIRED_STREAM_ID GST_RTP_HDREXT_BASE "sdes:repaired-rtp-stream-id" + static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, @@ -199,6 +212,40 @@ GST_ELEMENT_REGISTER_DEFINE (rtprtxreceive, "rtprtxreceive", GST_RANK_NONE, GST_TYPE_RTP_RTX_RECEIVE); static void +gst_rtp_rtx_receive_add_extension (GstRtpRtxReceive * rtx, + GstRTPHeaderExtension * ext) +{ + g_return_if_fail (GST_IS_RTP_HEADER_EXTENSION (ext)); + g_return_if_fail (gst_rtp_header_extension_get_id (ext) > 0); + + GST_OBJECT_LOCK (rtx); + if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext), + RTPHDREXT_STREAM_ID) == 0) { + gst_clear_object (&rtx->rid_stream); + rtx->rid_stream = gst_object_ref (ext); + } else if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext), + RTPHDREXT_REPAIRED_STREAM_ID) == 0) { + gst_clear_object (&rtx->rid_repaired); + rtx->rid_repaired = gst_object_ref (ext); + } else { + g_warning ("rtprtxsend (%s) doesn't know how to deal with the " + "RTP Header Extension with URI \'%s\'", GST_OBJECT_NAME (rtx), + gst_rtp_header_extension_get_uri (ext)); + } + /* XXX: check for other duplicate ids? */ + GST_OBJECT_UNLOCK (rtx); +} + +static void +gst_rtp_rtx_receive_clear_extensions (GstRtpRtxReceive * rtx) +{ + GST_OBJECT_LOCK (rtx); + gst_clear_object (&rtx->rid_stream); + gst_clear_object (&rtx->rid_repaired); + GST_OBJECT_UNLOCK (rtx); +} + +static void gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass) { GObjectClass *gobject_class; @@ -248,6 +295,38 @@ gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass) "correctly associated with retransmission requests", 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + /** + * rtprtxreceive::add-extension: + * + * Add @ext as an extension for writing part of an RTP header extension onto + * outgoing RTP packets. Currently only supports using the following + * extension URIs. All other RTP header extensions are copied as-is. + * - "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will be removed + * - "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be + * written instead of the "rtp-stream-id" header extension. + * + * Since: 1.22 + */ + gst_rtp_rtx_receive_signals[SIGNAL_ADD_EXTENSION] = + g_signal_new_class_handler ("add-extension", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_rtp_rtx_receive_add_extension), NULL, NULL, NULL, + G_TYPE_NONE, 1, GST_TYPE_RTP_HEADER_EXTENSION); + + /** + * rtprtxreceive::clear-extensions: + * @object: the #GstRTPBasePayload + * + * Clear all RTP header extensions used by rtprtxreceive. + * + * Since: 1.22 + */ + gst_rtp_rtx_receive_signals[SIGNAL_CLEAR_EXTENSIONS] = + g_signal_new_class_handler ("clear-extensions", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_rtp_rtx_receive_clear_extensions), NULL, NULL, NULL, + G_TYPE_NONE, 0); + gst_element_class_add_static_pad_template (gstelement_class, &src_factory); gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); @@ -285,6 +364,11 @@ gst_rtp_rtx_receive_finalize (GObject * object) if (rtx->rtx_pt_map_structure) gst_structure_free (rtx->rtx_pt_map_structure); + gst_clear_object (&rtx->rid_stream); + gst_clear_object (&rtx->rid_repaired); + + gst_clear_buffer (&rtx->dummy_writable); + G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object); } @@ -339,6 +423,8 @@ gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx) NULL, (GDestroyNotify) ssrc_assoc_free); rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal); + + rtx->dummy_writable = gst_buffer_new (); } static gboolean @@ -465,13 +551,169 @@ gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent, return res; } +static GstMemory * +rewrite_header_extensions (GstRtpRtxReceive * rtx, GstRTPBuffer * rtp) +{ + gsize out_size = rtp->size[1] + 32; + guint16 bit_pattern; + guint8 *pdata; + guint wordlen; + GstMemory *mem; + GstMapInfo map; + + mem = gst_allocator_alloc (NULL, out_size, NULL); + + gst_memory_map (mem, &map, GST_MAP_READWRITE); + + if (gst_rtp_buffer_get_extension_data (rtp, &bit_pattern, (gpointer) & pdata, + &wordlen)) { + GstRTPHeaderExtensionFlags ext_flags = 0; + gsize bytelen = wordlen * 4; + guint hdr_unit_bytes; + gsize read_offset = 0, write_offset = 4; + + if (bit_pattern == 0xBEDE) { + /* one byte extensions */ + hdr_unit_bytes = 1; + ext_flags |= GST_RTP_HEADER_EXTENSION_ONE_BYTE; + } else if (bit_pattern >> 4 == 0x100) { + /* two byte extensions */ + hdr_unit_bytes = 2; + ext_flags |= GST_RTP_HEADER_EXTENSION_TWO_BYTE; + } else { + GST_DEBUG_OBJECT (rtx, "unknown extension bit pattern 0x%02x%02x", + bit_pattern >> 8, bit_pattern & 0xff); + goto copy_as_is; + } + + GST_WRITE_UINT16_BE (map.data, bit_pattern); + + while (TRUE) { + guint8 read_id, read_len; + + if (read_offset + hdr_unit_bytes >= bytelen) + /* not enough remaning data */ + break; + + if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) { + read_id = GST_READ_UINT8 (pdata + read_offset) >> 4; + read_len = (GST_READ_UINT8 (pdata + read_offset) & 0x0F) + 1; + read_offset += 1; + + if (read_id == 0) + /* padding */ + continue; + + if (read_id == 15) + /* special id for possible future expansion */ + break; + } else { + read_id = GST_READ_UINT8 (pdata + read_offset); + read_offset += 1; + + if (read_id == 0) + /* padding */ + continue; + + read_len = GST_READ_UINT8 (pdata + read_offset); + read_offset += 1; + } + GST_TRACE_OBJECT (rtx, "found rtp header extension with id %u and " + "length %u", read_id, read_len); + + /* Ignore extension headers where the size does not fit */ + if (read_offset + read_len > bytelen) { + GST_WARNING_OBJECT (rtx, "Extension length extends past the " + "size of the extension data"); + break; + } + + /* rewrite the rtp-stream-id into a repaired-stream-id */ + if (rtx->rid_stream + && read_id == gst_rtp_header_extension_get_id (rtx->rid_repaired)) { + if (!gst_rtp_header_extension_read (rtx->rid_repaired, ext_flags, + &pdata[read_offset], read_len, rtx->dummy_writable)) { + GST_WARNING_OBJECT (rtx, "RTP header extension (%s) could " + "not read payloaded data", GST_OBJECT_NAME (rtx->rid_stream)); + goto copy_as_is; + } + if (rtx->rid_repaired) { + guint8 write_id = gst_rtp_header_extension_get_id (rtx->rid_stream); + gsize written; + char *rid; + + g_object_get (rtx->rid_repaired, "rid", &rid, NULL); + g_object_set (rtx->rid_stream, "rid", rid, NULL); + g_clear_pointer (&rid, g_free); + + written = + gst_rtp_header_extension_write (rtx->rid_stream, rtp->buffer, + ext_flags, rtx->dummy_writable, + &map.data[write_offset + hdr_unit_bytes], + map.size - write_offset - hdr_unit_bytes); + GST_TRACE_OBJECT (rtx->rid_repaired, "wrote %" G_GSIZE_FORMAT, + written); + if (written <= 0) { + GST_WARNING_OBJECT (rtx, "Failed to rewrite RID for RTX"); + goto copy_as_is; + } else { + if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) { + map.data[write_offset] = + ((write_id & 0x0F) << 4) | ((written - 1) & 0x0F); + } else if (ext_flags & GST_RTP_HEADER_EXTENSION_TWO_BYTE) { + map.data[write_offset] = write_id & 0xFF; + map.data[write_offset + 1] = written & 0xFF; + } else { + g_assert_not_reached (); + goto copy_as_is; + } + write_offset += written + hdr_unit_bytes; + } + } + } else { + /* TODO: may need to write mid at different times to the original + * buffer to account for the difference in timing of acknowledgement + * of the rtx ssrc from the original ssrc. This may add extra data to + * the header extension space that needs to be accounted for. + */ + memcpy (&map.data[write_offset], + &map.data[read_offset - hdr_unit_bytes], read_len + hdr_unit_bytes); + write_offset += read_len + hdr_unit_bytes; + } + + read_offset += read_len; + } + + /* subtract the ext header */ + wordlen = write_offset / 4 + ((write_offset % 4) ? 1 : 0); + + /* wordlen in the ext data doesn't include the 4-byte header */ + GST_WRITE_UINT16_BE (map.data + 2, wordlen - 1); + + if (wordlen * 4 > write_offset) + memset (&map.data[write_offset], 0, wordlen * 4 - write_offset); + + GST_MEMDUMP_OBJECT (rtx, "generated ext data", map.data, wordlen * 4); + } else { + copy_as_is: + wordlen = rtp->size[1] / 4; + memcpy (map.data, rtp->data[1], rtp->size[1]); + GST_LOG_OBJECT (rtx, "copying data as-is"); + } + + gst_memory_unmap (mem, &map); + gst_memory_resize (mem, 0, wordlen * 4); + + return mem; +} + /* Copy fixed header and extension. Replace current ssrc by ssrc1, * remove OSN and replace current seq num by OSN. * Copy memory to avoid to manually copy each rtp buffer field. */ static GstBuffer * -_gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1, - guint16 orign_seqnum, guint8 origin_payload_type) +_gst_rtp_buffer_new_from_rtx (GstRtpRtxReceive * rtx, GstRTPBuffer * rtp, + guint32 ssrc1, guint16 orign_seqnum, guint8 origin_payload_type) { GstMemory *mem = NULL; GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT; @@ -486,8 +728,7 @@ _gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1, /* copy extension if any */ if (rtp->size[1]) { - mem = gst_memory_copy (rtp->map[1].memory, - (guint8 *) rtp->data[1] - rtp->map[1].data, rtp->size[1]); + mem = rewrite_header_extensions (rtx, rtp); gst_buffer_append_memory (new_buffer, mem); } @@ -556,6 +797,10 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) goto invalid_buffer; + GST_MEMDUMP_OBJECT (rtx, "rtp header", rtp.map[0].data, rtp.map[0].size); + GST_MEMDUMP_OBJECT (rtx, "rtp ext", rtp.map[1].data, rtp.map[1].size); + GST_MEMDUMP_OBJECT (rtx, "rtp payload", rtp.map[2].data, rtp.map[2].size); + ssrc = gst_rtp_buffer_get_ssrc (&rtp); seqnum = gst_rtp_buffer_get_seq (&rtp); payload_type = gst_rtp_buffer_get_payload_type (&rtp); @@ -690,7 +935,7 @@ gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) /* create the retransmission packet */ if (is_rtx) new_buffer = - _gst_rtp_buffer_new_from_rtx (&rtp, GPOINTER_TO_UINT (ssrc1), + _gst_rtp_buffer_new_from_rtx (rtx, &rtp, GPOINTER_TO_UINT (ssrc1), orign_seqnum, origin_payload_type); gst_rtp_buffer_unmap (&rtp); diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.h b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.h index fd628ba..833c0a2 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.h +++ b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxreceive.h @@ -25,7 +25,7 @@ #define __GST_RTP_RTX_RECEIVE_H__ #include -#include +#include G_BEGIN_DECLS typedef struct _GstRtpRtxReceive GstRtpRtxReceive; @@ -69,6 +69,11 @@ struct _GstRtpRtxReceive guint num_rtx_assoc_packets; GstClockTime last_time; + + GstRTPHeaderExtension *rid_stream; + GstRTPHeaderExtension *rid_repaired; + + GstBuffer *dummy_writable; }; struct _GstRtpRtxReceiveClass diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c index 1ebcb22..adad57c 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c +++ b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.c @@ -41,7 +41,6 @@ #endif #include -#include #include #include @@ -66,6 +65,20 @@ enum PROP_CLOCK_RATE_MAP, }; +enum +{ + SIGNAL_0, + SIGNAL_ADD_EXTENSION, + SIGNAL_CLEAR_EXTENSIONS, + LAST_SIGNAL +}; + +static guint gst_rtp_rtx_send_signals[LAST_SIGNAL] = { 0, }; + +#define RTPHDREXT_BUNDLE_MID GST_RTP_HDREXT_BASE "sdes:mid" +#define RTPHDREXT_STREAM_ID GST_RTP_HDREXT_BASE "sdes:rtp-stream-id" +#define RTPHDREXT_REPAIRED_STREAM_ID GST_RTP_HDREXT_BASE "sdes:repaired-rtp-stream-id" + static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, @@ -103,6 +116,40 @@ static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); static void gst_rtp_rtx_send_finalize (GObject * object); +static void +gst_rtp_rtx_send_add_extension (GstRtpRtxSend * rtx, + GstRTPHeaderExtension * ext) +{ + g_return_if_fail (GST_IS_RTP_HEADER_EXTENSION (ext)); + g_return_if_fail (gst_rtp_header_extension_get_id (ext) > 0); + + GST_OBJECT_LOCK (rtx); + if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext), + RTPHDREXT_STREAM_ID) == 0) { + gst_clear_object (&rtx->rid_stream); + rtx->rid_stream = gst_object_ref (ext); + } else if (g_strcmp0 (gst_rtp_header_extension_get_uri (ext), + RTPHDREXT_REPAIRED_STREAM_ID) == 0) { + gst_clear_object (&rtx->rid_repaired); + rtx->rid_repaired = gst_object_ref (ext); + } else { + g_warning ("rtprtxsend (%s) doesn't know how to deal with the " + "RTP Header Extension with URI \'%s\'", GST_OBJECT_NAME (rtx), + gst_rtp_header_extension_get_uri (ext)); + } + /* XXX: check for other duplicate ids? */ + GST_OBJECT_UNLOCK (rtx); +} + +static void +gst_rtp_rtx_send_clear_extensions (GstRtpRtxSend * rtx) +{ + GST_OBJECT_LOCK (rtx); + gst_clear_object (&rtx->rid_stream); + gst_clear_object (&rtx->rid_repaired); + GST_OBJECT_UNLOCK (rtx); +} + G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0, "rtp retransmission sender")); @@ -258,6 +305,38 @@ gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass) "Map of payload types to their clock rates", GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * rtprtxsend::add-extension: + * + * Add @ext as an extension for writing part of an RTP header extension onto + * outgoing RTP packets. Currently only supports using the following + * extension URIs. All other RTP header extensions are copied as-is. + * - "urn:ietf:params:rtp-hdrext:sdes:rtp-stream-id": will be removed + * - "urn:ietf:params:rtp-hdrext:sdes:repaired-rtp-stream-id": will be + * written instead of the "rtp-stream-id" header extension. + * + * Since: 1.22 + */ + gst_rtp_rtx_send_signals[SIGNAL_ADD_EXTENSION] = + g_signal_new_class_handler ("add-extension", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_rtp_rtx_send_add_extension), NULL, NULL, NULL, + G_TYPE_NONE, 1, GST_TYPE_RTP_HEADER_EXTENSION); + + /** + * rtprtxsend::clear-extensions: + * @object: the #GstRTPBasePayload + * + * Clear all RTP header extensions used by this rtprtxsend. + * + * Since: 1.22 + */ + gst_rtp_rtx_send_signals[SIGNAL_CLEAR_EXTENSIONS] = + g_signal_new_class_handler ("clear-extensions", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, + G_CALLBACK (gst_rtp_rtx_send_clear_extensions), NULL, NULL, NULL, + G_TYPE_NONE, 0); + gst_element_class_add_static_pad_template (gstelement_class, &src_factory); gst_element_class_add_static_pad_template (gstelement_class, &sink_factory); @@ -299,6 +378,11 @@ gst_rtp_rtx_send_finalize (GObject * object) gst_structure_free (rtx->clock_rate_map_structure); g_object_unref (rtx->queue); + gst_clear_object (&rtx->rid_stream); + gst_clear_object (&rtx->rid_repaired); + + gst_clear_buffer (&rtx->dummy_writable); + G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object); } @@ -341,6 +425,8 @@ gst_rtp_rtx_send_init (GstRtpRtxSend * rtx) rtx->max_size_time = DEFAULT_MAX_SIZE_TIME; rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS; + + rtx->dummy_writable = gst_buffer_new (); } static gboolean @@ -422,6 +508,162 @@ gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc) return data; } +static GstMemory * +rewrite_header_extensions (GstRtpRtxSend * rtx, GstRTPBuffer * rtp) +{ + gsize out_size = rtp->size[1] + 32; + guint16 bit_pattern; + guint8 *pdata; + guint wordlen; + GstMemory *mem; + GstMapInfo map; + + mem = gst_allocator_alloc (NULL, out_size, NULL); + + gst_memory_map (mem, &map, GST_MAP_READWRITE); + + if (gst_rtp_buffer_get_extension_data (rtp, &bit_pattern, (gpointer) & pdata, + &wordlen)) { + GstRTPHeaderExtensionFlags ext_flags = 0; + gsize bytelen = wordlen * 4; + guint hdr_unit_bytes; + gsize read_offset = 0, write_offset = 4; + + if (bit_pattern == 0xBEDE) { + /* one byte extensions */ + hdr_unit_bytes = 1; + ext_flags |= GST_RTP_HEADER_EXTENSION_ONE_BYTE; + } else if (bit_pattern >> 4 == 0x100) { + /* two byte extensions */ + hdr_unit_bytes = 2; + ext_flags |= GST_RTP_HEADER_EXTENSION_TWO_BYTE; + } else { + GST_DEBUG_OBJECT (rtx, "unknown extension bit pattern 0x%02x%02x", + bit_pattern >> 8, bit_pattern & 0xff); + goto copy_as_is; + } + + GST_WRITE_UINT16_BE (map.data, bit_pattern); + + while (TRUE) { + guint8 read_id, read_len; + + if (read_offset + hdr_unit_bytes >= bytelen) + /* not enough remaning data */ + break; + + if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) { + read_id = GST_READ_UINT8 (pdata + read_offset) >> 4; + read_len = (GST_READ_UINT8 (pdata + read_offset) & 0x0F) + 1; + read_offset += 1; + + if (read_id == 0) + /* padding */ + continue; + + if (read_id == 15) + /* special id for possible future expansion */ + break; + } else { + read_id = GST_READ_UINT8 (pdata + read_offset); + read_offset += 1; + + if (read_id == 0) + /* padding */ + continue; + + read_len = GST_READ_UINT8 (pdata + read_offset); + read_offset += 1; + } + GST_TRACE_OBJECT (rtx, "found rtp header extension with id %u and " + "length %u", read_id, read_len); + + /* Ignore extension headers where the size does not fit */ + if (read_offset + read_len > bytelen) { + GST_WARNING_OBJECT (rtx, "Extension length extends past the " + "size of the extension data"); + break; + } + + /* rewrite the rtp-stream-id into a repaired-stream-id */ + if (rtx->rid_stream + && read_id == gst_rtp_header_extension_get_id (rtx->rid_stream)) { + if (!gst_rtp_header_extension_read (rtx->rid_stream, ext_flags, + &pdata[read_offset], read_len, rtx->dummy_writable)) { + GST_WARNING_OBJECT (rtx, "RTP header extension (%s) could " + "not read payloaded data", GST_OBJECT_NAME (rtx->rid_stream)); + goto copy_as_is; + } + if (rtx->rid_repaired) { + guint8 write_id = gst_rtp_header_extension_get_id (rtx->rid_repaired); + gsize written; + char *rid; + + g_object_get (rtx->rid_stream, "rid", &rid, NULL); + g_object_set (rtx->rid_repaired, "rid", rid, NULL); + g_clear_pointer (&rid, g_free); + + written = + gst_rtp_header_extension_write (rtx->rid_repaired, rtp->buffer, + ext_flags, rtx->dummy_writable, + &map.data[write_offset + hdr_unit_bytes], + map.size - write_offset - hdr_unit_bytes); + GST_TRACE_OBJECT (rtx->rid_repaired, "wrote %" G_GSIZE_FORMAT, + written); + if (written <= 0) { + GST_WARNING_OBJECT (rtx, "Failed to rewrite RID for RTX"); + goto copy_as_is; + } else { + if (ext_flags & GST_RTP_HEADER_EXTENSION_ONE_BYTE) { + map.data[write_offset] = + ((write_id & 0x0F) << 4) | ((written - 1) & 0x0F); + } else if (ext_flags & GST_RTP_HEADER_EXTENSION_TWO_BYTE) { + map.data[write_offset] = write_id & 0xFF; + map.data[write_offset + 1] = written & 0xFF; + } else { + g_assert_not_reached (); + goto copy_as_is; + } + write_offset += written + hdr_unit_bytes; + } + } + } else { + /* TODO: may need to write mid at different times to the original + * buffer to account for the difference in timing of acknowledgement + * of the rtx ssrc from the original ssrc. This may add extra data to + * the header extension space that needs to be accounted for. + */ + memcpy (&map.data[write_offset], + &map.data[read_offset - hdr_unit_bytes], read_len + hdr_unit_bytes); + write_offset += read_len + hdr_unit_bytes; + } + + read_offset += read_len; + } + + /* subtract the ext header */ + wordlen = write_offset / 4 + ((write_offset % 4) ? 1 : 0); + + /* wordlen in the ext data doesn't include the 4-byte header */ + GST_WRITE_UINT16_BE (map.data + 2, wordlen - 1); + + if (wordlen * 4 > write_offset) + memset (&map.data[write_offset], 0, wordlen * 4 - write_offset); + + GST_MEMDUMP_OBJECT (rtx, "generated ext data", map.data, wordlen * 4); + } else { + copy_as_is: + wordlen = rtp->size[1] / 4; + memcpy (map.data, rtp->data[1], rtp->size[1]); + GST_LOG_OBJECT (rtx, "copying data as-is"); + } + + gst_memory_unmap (mem, &map); + gst_memory_resize (mem, 0, wordlen * 4); + + return mem; +} + /* Copy fixed header and extension. Add OSN before to copy payload * Copy memory to avoid to manually copy each rtp buffer field. */ @@ -462,10 +704,7 @@ gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer) /* copy extension if any */ if (rtp.size[1]) { - mem = gst_allocator_alloc (NULL, rtp.size[1], NULL); - gst_memory_map (mem, &map, GST_MAP_WRITE); - memcpy (map.data, rtp.data[1], rtp.size[1]); - gst_memory_unmap (mem, &map); + mem = rewrite_header_extensions (rtx, &rtp); gst_buffer_append_memory (new_buffer, mem); } diff --git a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.h b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.h index 5a74609..60a4ec5 100644 --- a/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.h +++ b/subprojects/gst-plugins-good/gst/rtpmanager/gstrtprtxsend.h @@ -25,7 +25,7 @@ #define __GST_RTP_RTX_SEND_H__ #include -#include +#include #include G_BEGIN_DECLS @@ -77,6 +77,12 @@ struct _GstRtpRtxSend /* statistics */ guint num_rtx_requests; guint num_rtx_packets; + + /* list of relevant RTP Header Extensions */ + GstRTPHeaderExtension *rid_stream; + GstRTPHeaderExtension *rid_repaired; + + GstBuffer *dummy_writable; }; struct _GstRtpRtxSendClass diff --git a/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c b/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c index 87b1cc6..6206574 100644 --- a/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c +++ b/subprojects/gst-plugins-good/tests/check/elements/rtprtx.c @@ -20,7 +20,7 @@ */ #include #include -#include +#include #define verify_buf(buf, is_rtx, expected_ssrc, expted_pt, expected_seqnum) \ G_STMT_START { \ @@ -94,6 +94,20 @@ compare_rtp_packets (GstBuffer * a, GstBuffer * b) gst_rtp_buffer_get_payload (&rtp_b), gst_rtp_buffer_get_payload_len (&rtp_a)), 0); + if (gst_rtp_buffer_get_extension (&rtp_a)) { + guint16 ext_bits_a, ext_bits_b; + guint8 *ext_data_a, *ext_data_b; + guint wordlen_a, wordlen_b; + + fail_unless_equals_int (TRUE, gst_rtp_buffer_get_extension_data (&rtp_a, + &ext_bits_a, (gpointer) & ext_data_a, &wordlen_a)); + fail_unless_equals_int (TRUE, gst_rtp_buffer_get_extension_data (&rtp_b, + &ext_bits_b, (gpointer) & ext_data_b, &wordlen_b)); + fail_unless_equals_int (ext_bits_a, ext_bits_b); + fail_unless_equals_int (wordlen_a, wordlen_b); + fail_unless_equals_int (0, memcmp (ext_data_a, ext_data_b, wordlen_a * 4)); + } + gst_rtp_buffer_unmap (&rtp_a); gst_rtp_buffer_unmap (&rtp_b); } @@ -913,6 +927,132 @@ GST_START_TEST (test_rtxsender_clock_rate_map) GST_END_TEST; +#define RTPHDREXT_STREAM_ID GST_RTP_HDREXT_BASE "sdes:rtp-stream-id" +#define RTPHDREXT_REPAIRED_STREAM_ID GST_RTP_HDREXT_BASE "sdes:repaired-rtp-stream-id" + +GST_START_TEST (test_rtxsend_header_extensions) +{ + const guint packets_num = 5; + guint master_ssrc = 1234567; + guint master_pt = 96; + guint rtx_pt = 99; + GstStructure *pt_map; + GstBuffer *inbufs[5]; + GstHarness *hrecv = gst_harness_new ("rtprtxreceive"); + GstHarness *hsend = gst_harness_new ("rtprtxsend"); + GstRTPHeaderExtension *send_stream_id, *send_repaired_stream_id; + GstRTPHeaderExtension *recv_stream_id, *recv_repaired_stream_id; + guint stream_hdr_id = 1, repaired_hdr_id = 2; + gint i; + + pt_map = gst_structure_new ("application/x-rtp-pt-map", + "96", G_TYPE_UINT, rtx_pt, NULL); + g_object_set (hrecv->element, "payload-type-map", pt_map, NULL); + g_object_set (hsend->element, "payload-type-map", pt_map, NULL); + + gst_harness_set_src_caps_str (hsend, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); + gst_harness_set_src_caps_str (hrecv, "application/x-rtp, " + "media = (string)video, payload = (int)96, " + "ssrc = (uint)1234567, clock-rate = (int)90000, " + "encoding-name = (string)RAW"); + + send_stream_id = + gst_rtp_header_extension_create_from_uri (RTPHDREXT_STREAM_ID); + gst_rtp_header_extension_set_id (send_stream_id, stream_hdr_id); + g_object_set (send_stream_id, "rid", "0", NULL); + fail_unless (send_stream_id != NULL); + g_signal_emit_by_name (hsend->element, "add-extension", send_stream_id); + gst_clear_object (&send_stream_id); + + send_repaired_stream_id = + gst_rtp_header_extension_create_from_uri (RTPHDREXT_REPAIRED_STREAM_ID); + g_object_set (send_repaired_stream_id, "rid", "0", NULL); + gst_rtp_header_extension_set_id (send_repaired_stream_id, repaired_hdr_id); + fail_unless (send_repaired_stream_id != NULL); + g_signal_emit_by_name (hsend->element, "add-extension", + send_repaired_stream_id); + gst_clear_object (&send_repaired_stream_id); + + recv_stream_id = + gst_rtp_header_extension_create_from_uri (RTPHDREXT_STREAM_ID); + gst_rtp_header_extension_set_id (recv_stream_id, stream_hdr_id); + fail_unless (recv_stream_id != NULL); + g_signal_emit_by_name (hrecv->element, "add-extension", recv_stream_id); + gst_clear_object (&recv_stream_id); + + recv_repaired_stream_id = + gst_rtp_header_extension_create_from_uri (RTPHDREXT_REPAIRED_STREAM_ID); + gst_rtp_header_extension_set_id (recv_repaired_stream_id, repaired_hdr_id); + fail_unless (recv_repaired_stream_id != NULL); + g_signal_emit_by_name (hrecv->element, "add-extension", + recv_repaired_stream_id); + gst_clear_object (&recv_repaired_stream_id); + + /* Push 'packets_num' packets through rtxsend to rtxreceive */ + for (i = 0; i < packets_num; ++i) { + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + inbufs[i] = create_rtp_buffer (master_ssrc, master_pt, 100 + i); + fail_unless (gst_rtp_buffer_map (inbufs[i], GST_MAP_READWRITE, &rtp)); + fail_unless (gst_rtp_buffer_add_extension_onebyte_header (&rtp, + stream_hdr_id, "0", 1)); + gst_rtp_buffer_unmap (&rtp); + gst_harness_push (hsend, gst_buffer_ref (inbufs[i])); + gst_harness_push (hrecv, gst_harness_pull (hsend)); + pull_and_verify (hrecv, FALSE, master_ssrc, master_pt, 100 + i); + } + + /* Getting rid of reconfigure event. Preparation before the next step */ + gst_event_unref (gst_harness_pull_upstream_event (hrecv)); + fail_unless_equals_int (gst_harness_upstream_events_in_queue (hrecv), 0); + + /* Push 'packets_num' RTX events through rtxreceive to rtxsend. + Push RTX packets from rtxsend to rtxreceive and + check that the packet produced out of RTX packet is the same + as an original packet */ + for (i = 0; i < packets_num; ++i) { + GstBuffer *outbuf; + gst_harness_push_upstream_event (hrecv, + create_rtx_event (master_ssrc, master_pt, 100 + i)); + gst_harness_push_upstream_event (hsend, + gst_harness_pull_upstream_event (hrecv)); + gst_harness_push (hrecv, gst_harness_pull (hsend)); + + outbuf = gst_harness_pull (hrecv); + compare_rtp_packets (inbufs[i], outbuf); + gst_buffer_unref (inbufs[i]); + gst_buffer_unref (outbuf); + } + + /* Check RTX stats */ + { + guint rtx_requests; + guint rtx_packets; + guint rtx_assoc_packets; + g_object_get (G_OBJECT (hsend->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, NULL); + fail_unless_equals_int (rtx_packets, packets_num); + fail_unless_equals_int (rtx_requests, packets_num); + + g_object_get (G_OBJECT (hrecv->element), + "num-rtx-requests", &rtx_requests, + "num-rtx-packets", &rtx_packets, + "num-rtx-assoc-packets", &rtx_assoc_packets, NULL); + fail_unless_equals_int (rtx_packets, packets_num); + fail_unless_equals_int (rtx_requests, packets_num); + fail_unless_equals_int (rtx_assoc_packets, packets_num); + } + + gst_structure_free (pt_map); + gst_harness_teardown (hrecv); + gst_harness_teardown (hsend); +} + +GST_END_TEST; + static Suite * rtprtx_suite (void) { @@ -938,6 +1078,7 @@ rtprtx_suite (void) tcase_add_test (tc_chain, test_rtxqueue_max_size_packets); tcase_add_test (tc_chain, test_rtxqueue_max_size_time); tcase_add_test (tc_chain, test_rtxsender_clock_rate_map); + tcase_add_test (tc_chain, test_rtxsend_header_extensions); return s; } -- 2.7.4