From cff42d4c26ddf268fd50973aa6d086bc18694768 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 6 Oct 2020 03:13:30 +0200 Subject: [PATCH] rtpmanager: implement SMPTE 2022-1 FEC decoder + improve integration of FEC decoders in rtpbin Part-of: --- docs/gst_plugins_cache.json | 64 ++ gst/rtpmanager/gstrtpbin.c | 301 ++++++++- gst/rtpmanager/gstrtpbin.h | 3 + gst/rtpmanager/gstrtpmanager.c | 5 + gst/rtpmanager/gstrtpst2022-1-fecdec.c | 995 ++++++++++++++++++++++++++++++ gst/rtpmanager/gstrtpst2022-1-fecdec.h | 37 ++ gst/rtpmanager/meson.build | 1 + tests/check/elements/rtpst2022-1-fecdec.c | 444 +++++++++++++ tests/check/meson.build | 1 + 9 files changed, 1843 insertions(+), 8 deletions(-) create mode 100644 gst/rtpmanager/gstrtpst2022-1-fecdec.c create mode 100644 gst/rtpmanager/gstrtpst2022-1-fecdec.h create mode 100644 tests/check/elements/rtpst2022-1-fecdec.c diff --git a/docs/gst_plugins_cache.json b/docs/gst_plugins_cache.json index f8ac35e..b0f18c2 100644 --- a/docs/gst_plugins_cache.json +++ b/docs/gst_plugins_cache.json @@ -16068,6 +16068,11 @@ "klass": "Filter/Network/RTP", "long-name": "RTP Bin", "pad-templates": { + "recv_fec_sink_%%u_%%u": { + "caps": "application/x-rtp:\n", + "direction": "sink", + "presence": "request" + }, "recv_rtcp_sink_%%u": { "caps": "application/x-rtcp:\napplication/x-srtcp:\n", "direction": "sink", @@ -16172,6 +16177,18 @@ "type": "gboolean", "writable": true }, + "fec-decoders": { + "blurb": "GstStructure mapping from session index to FEC decoder factory, eg fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "application/x-rtp-fec-decoders;", + "mutable": "null", + "readable": true, + "type": "GstStructure", + "writable": true + }, "ignore-pt": { "blurb": "Do not demultiplex based on PT values", "conditionally-available": false, @@ -18227,6 +18244,53 @@ "when": "last" } } + }, + "rtpst2022-1-fecdec": { + "author": "Mathieu Duponchelle ", + "description": "performs FEC as described by SMPTE 2022-1", + "hierarchy": [ + "GstRTPST_2022_1_FecDec", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "SMPTE 2022-1 FEC decoding", + "long-name": "SMPTE 2022-1 FEC decoder", + "pad-templates": { + "fec_%%u": { + "caps": "application/x-rtp:\n", + "direction": "sink", + "presence": "request" + }, + "sink": { + "caps": "application/x-rtp:\n", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "application/x-rtp:\n", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "size-time": { + "blurb": "The amount of data to store (in ns, 0-disable)", + "conditionally-available": false, + "construct": true, + "construct-only": false, + "controllable": false, + "default": "1000000000", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + } + }, + "rank": "none" } }, "filename": "gstrtpmanager", diff --git a/gst/rtpmanager/gstrtpbin.c b/gst/rtpmanager/gstrtpbin.c index 1fb98ff..444a66d 100644 --- a/gst/rtpmanager/gstrtpbin.c +++ b/gst/rtpmanager/gstrtpbin.c @@ -165,6 +165,23 @@ static GstStaticPadTemplate rtpbin_recv_rtp_sink_template = GST_STATIC_CAPS ("application/x-rtp;application/x-srtp") ); +/** + * GstRtpBin!recv_fec_sink_%u_%u: + * + * Sink template for receiving Forward Error Correction packets, + * in the form recv_fec_sink__ + * + * See #GstRTPST_2022_1_FecDec for example usage + * + * Since: 1.20 + */ +static GstStaticPadTemplate rtpbin_recv_fec_sink_template = +GST_STATIC_PAD_TEMPLATE ("recv_fec_sink_%u_%u", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS ("application/x-rtp") + ); + static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template = GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%u", GST_PAD_SINK, @@ -344,6 +361,7 @@ enum PROP_MAX_STREAMS, PROP_MAX_TS_OFFSET_ADJUSTMENT, PROP_MAX_TS_OFFSET, + PROP_FEC_DECODERS, }; #define GST_RTP_BIN_RTCP_SYNC_TYPE (gst_rtp_bin_rtcp_sync_get_type()) @@ -377,6 +395,7 @@ static void payload_type_change (GstElement * element, guint pt, GstRtpBinSession * session); static void remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); +static void remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_send_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void remove_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session); static void free_client (GstRtpBinClient * client, GstRtpBin * bin); @@ -486,6 +505,10 @@ struct _GstRtpBinSession GstPad *send_rtp_src_ghost; GstPad *send_rtcp_src; GstPad *send_rtcp_src_ghost; + + GSList *recv_fec_sinks; + GSList *recv_fec_sink_ghosts; + GstElement *fec_decoder; }; /* Manages the RTP streams that come from one client and should therefore be @@ -517,6 +540,12 @@ find_session_by_id (GstRtpBin * rtpbin, gint id) return NULL; } +static gboolean +pad_is_recv_fec (GstRtpBinSession * session, GstPad * pad) +{ + return g_slist_find (session->recv_fec_sink_ghosts, pad) != NULL; +} + /* find a session with the given request pad. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad) @@ -528,8 +557,8 @@ find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad) if ((sess->recv_rtp_sink_ghost == pad) || (sess->recv_rtcp_sink_ghost == pad) || - (sess->send_rtp_sink_ghost == pad) - || (sess->send_rtcp_src_ghost == pad)) + (sess->send_rtp_sink_ghost == pad) || + (sess->send_rtcp_src_ghost == pad) || pad_is_recv_fec (sess, pad)) return sess; } return NULL; @@ -850,6 +879,7 @@ free_session (GstRtpBinSession * sess, GstRtpBin * bin) remove_recv_rtp (bin, sess); remove_recv_rtcp (bin, sess); + remove_recv_fec (bin, sess); remove_send_rtp (bin, sess); remove_rtcp (bin, sess); @@ -2640,6 +2670,24 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) "changed to 0 (no limit)", 0, G_MAXINT64, DEFAULT_MAX_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstRtpBin:fec-decoders: + * + * Used to provide a factory used to build the FEC decoder for a + * given session, as a command line alternative to + * #GstRtpBin::request-fec-decoder. + * + * Expects a GstStructure in the form session_id (gint) -> factory (string) + * + * Since: 1.20 + */ + g_object_class_install_property (gobject_class, PROP_FEC_DECODERS, + g_param_spec_boxed ("fec-decoders", "Fec Decoders", + "GstStructure mapping from session index to FEC decoder " + "factory, eg " + "fec-decoders='fec,0=\"rtpst2022-1-fecdec\\ size-time\\=1000000000\";'", + GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_bin_change_state); gstelement_class->request_new_pad = GST_DEBUG_FUNCPTR (gst_rtp_bin_request_new_pad); @@ -2649,6 +2697,8 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass) gst_element_class_add_static_pad_template (gstelement_class, &rtpbin_recv_rtp_sink_template); gst_element_class_add_static_pad_template (gstelement_class, + &rtpbin_recv_fec_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, &rtpbin_recv_rtcp_sink_template); gst_element_class_add_static_pad_template (gstelement_class, &rtpbin_send_rtp_sink_template); @@ -2726,6 +2776,8 @@ gst_rtp_bin_init (GstRtpBin * rtpbin) cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ()); rtpbin->sdes = gst_structure_new ("application/x-rtp-source-sdes", "cname", G_TYPE_STRING, cname, "tool", G_TYPE_STRING, "GStreamer", NULL); + rtpbin->fec_decoders = + gst_structure_new_empty ("application/x-rtp-fec-decoders"); g_free (cname); } @@ -2756,6 +2808,9 @@ gst_rtp_bin_finalize (GObject * object) if (rtpbin->sdes) gst_structure_free (rtpbin->sdes); + if (rtpbin->fec_decoders) + gst_structure_free (rtpbin->fec_decoders); + g_mutex_clear (&rtpbin->priv->bin_lock); g_mutex_clear (&rtpbin->priv->dyn_lock); @@ -2788,6 +2843,25 @@ gst_rtp_bin_set_sdes_struct (GstRtpBin * bin, const GstStructure * sdes) GST_RTP_BIN_UNLOCK (bin); } +static void +gst_rtp_bin_set_fec_decoders_struct (GstRtpBin * bin, + const GstStructure * decoders) +{ + if (decoders == NULL) + return; + + GST_RTP_BIN_LOCK (bin); + + GST_OBJECT_LOCK (bin); + if (bin->fec_decoders) + gst_structure_free (bin->fec_decoders); + bin->fec_decoders = gst_structure_copy (decoders); + + GST_OBJECT_UNLOCK (bin); + + GST_RTP_BIN_UNLOCK (bin); +} + static GstStructure * gst_rtp_bin_get_sdes_struct (GstRtpBin * bin) { @@ -2800,6 +2874,18 @@ gst_rtp_bin_get_sdes_struct (GstRtpBin * bin) return result; } +static GstStructure * +gst_rtp_bin_get_fec_decoders_struct (GstRtpBin * bin) +{ + GstStructure *result; + + GST_OBJECT_LOCK (bin); + result = gst_structure_copy (bin->fec_decoders); + GST_OBJECT_UNLOCK (bin); + + return result; +} + static void gst_rtp_bin_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) @@ -2963,6 +3049,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id, rtpbin->max_ts_offset = g_value_get_int64 (value); rtpbin->max_ts_offset_is_set = TRUE; break; + case PROP_FEC_DECODERS: + gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3057,6 +3146,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id, case PROP_MAX_TS_OFFSET: g_value_set_int64 (value, rtpbin->max_ts_offset); break; + case PROP_FEC_DECODERS: + g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -3343,6 +3435,48 @@ copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) return TRUE; } +static gboolean +ensure_fec_decoder (GstRtpBin * rtpbin, GstRtpBinSession * session) +{ + const gchar *factory; + gchar *sess_id_str; + + if (session->fec_decoder) + goto done; + + sess_id_str = g_strdup_printf ("%u", session->id); + factory = gst_structure_get_string (rtpbin->fec_decoders, sess_id_str); + g_free (sess_id_str); + + /* First try the property */ + if (factory) { + GError *err = NULL; + + session->fec_decoder = + gst_parse_bin_from_description_full (factory, TRUE, NULL, + GST_PARSE_FLAG_NO_SINGLE_ELEMENT_BINS | GST_PARSE_FLAG_FATAL_ERRORS, + &err); + if (!session->fec_decoder) { + GST_ERROR_OBJECT (rtpbin, "Failed to build decoder from factory: %s", + err->message); + } + + bin_manage_element (session->bin, session->fec_decoder); + session->elements = + g_slist_prepend (session->elements, session->fec_decoder); + GST_INFO_OBJECT (rtpbin, "Built FEC decoder: %" GST_PTR_FORMAT + " for session %u", session->fec_decoder, session->id); + } + + /* Fallback to the signal */ + if (!session->fec_decoder) + session->fec_decoder = + session_request_element (session, SIGNAL_REQUEST_FEC_DECODER); + +done: + return session->fec_decoder != NULL; +} + static void expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream, guint8 pt) @@ -3354,11 +3488,9 @@ expose_recv_src_pad (GstRtpBin * rtpbin, GstPad * pad, GstRtpBinStream * stream, gst_object_ref (pad); - if (stream->session->storage) { - GstElement *fec_decoder = - session_request_element (stream->session, SIGNAL_REQUEST_FEC_DECODER); - - if (fec_decoder) { + if (stream->session->storage && !stream->session->fec_decoder) { + if (ensure_fec_decoder (rtpbin, stream->session)) { + GstElement *fec_decoder = stream->session->fec_decoder; GstPad *sinkpad, *srcpad; GstPadLinkReturn ret; @@ -3594,6 +3726,15 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, padname = g_strdup_printf ("src_%u", ssrc); srcpad = gst_element_get_static_pad (element, padname); g_free (padname); + + if (session->fec_decoder) { + sinkpad = gst_element_get_static_pad (session->fec_decoder, "sink"); + gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING); + gst_object_unref (sinkpad); + gst_object_unref (srcpad); + srcpad = gst_element_get_static_pad (session->fec_decoder, "src"); + } + sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); gst_pad_link_full (srcpad, sinkpad, GST_PAD_LINK_CHECK_NOTHING); gst_object_unref (sinkpad); @@ -3934,6 +4075,41 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session) } static GstPad * +complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session, + guint fec_idx) +{ + gchar *padname; + GstPad *ret; + + if (!ensure_fec_decoder (rtpbin, session)) + goto no_decoder; + + GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad"); + padname = g_strdup_printf ("fec_%u", fec_idx); + ret = gst_element_get_request_pad (session->fec_decoder, padname); + g_free (padname); + + if (ret == NULL) + goto pad_failed; + + session->recv_fec_sinks = g_slist_prepend (session->recv_fec_sinks, ret); + + return ret; + +pad_failed: + { + g_warning ("rtpbin: failed to get decoder fec pad"); + return NULL; + } +no_decoder: + { + g_warning ("rtpbin: failed to build FEC decoder for session %u", + session->id); + return NULL; + } +} + +static GstPad * complete_session_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session, guint sessid) { @@ -4076,6 +4252,66 @@ create_error: } } +static GstPad * +create_recv_fec (GstRtpBin * rtpbin, GstPadTemplate * templ, const gchar * name) +{ + guint sessid, fec_idx; + GstRtpBinSession *session; + GstPad *decsink = NULL; + GstPad *ghost; + + /* first get the session number */ + if (name == NULL + || sscanf (name, "recv_fec_sink_%u_%u", &sessid, &fec_idx) != 2) + goto no_name; + + if (fec_idx > 1) + goto invalid_idx; + + GST_DEBUG_OBJECT (rtpbin, "finding session %u", sessid); + + /* get or create the session */ + session = find_session_by_id (rtpbin, sessid); + if (!session) { + GST_DEBUG_OBJECT (rtpbin, "creating session %u", sessid); + /* create session now */ + session = create_session (rtpbin, sessid); + if (session == NULL) + goto create_error; + } + + decsink = complete_session_fec (rtpbin, session, fec_idx); + if (!decsink) + goto create_error; + + ghost = gst_ghost_pad_new_from_template (name, decsink, templ); + session->recv_fec_sink_ghosts = + g_slist_prepend (session->recv_fec_sink_ghosts, ghost); + gst_object_unref (decsink); + gst_pad_set_active (ghost, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (rtpbin), ghost); + + return ghost; + + /* ERRORS */ +no_name: + { + g_warning ("rtpbin: cannot find session id for pad: %s", + GST_STR_NULL (name)); + return NULL; + } +invalid_idx: + { + g_warning ("rtpbin: invalid FEC index: %s", GST_STR_NULL (name)); + return NULL; + } +create_error: + { + /* create_session already warned */ + return NULL; + } +} + static void remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session) { @@ -4097,6 +4333,49 @@ remove_recv_rtcp (GstRtpBin * rtpbin, GstRtpBinSession * session) } } +static void +remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session, + GstPad * ghost) +{ + GSList *item; + GstPad *target; + + target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost)); + + if (target) { + item = g_slist_find (session->recv_fec_sinks, target); + if (item) { + gst_element_release_request_pad (session->fec_decoder, item->data); + session->recv_fec_sinks = + g_slist_delete_link (session->recv_fec_sinks, item); + } + gst_object_unref (target); + } + + item = g_slist_find (session->recv_fec_sink_ghosts, ghost); + if (item) + session->recv_fec_sink_ghosts = + g_slist_delete_link (session->recv_fec_sink_ghosts, item); + + gst_pad_set_active (ghost, FALSE); + gst_element_remove_pad (GST_ELEMENT_CAST (rtpbin), ghost); +} + +static void +remove_recv_fec (GstRtpBin * rtpbin, GstRtpBinSession * session) +{ + GSList *copy; + GSList *tmp; + + copy = g_slist_copy (session->recv_fec_sink_ghosts); + + for (tmp = copy; tmp; tmp = tmp->next) { + remove_recv_fec_for_pad (rtpbin, session, (GstPad *) tmp->data); + } + + g_slist_free (copy); +} + static gboolean complete_session_src (GstRtpBin * rtpbin, GstRtpBinSession * session) { @@ -4699,6 +4978,9 @@ gst_rtp_bin_request_new_pad (GstElement * element, } else if (templ == gst_element_class_get_pad_template (klass, "send_rtcp_src_%u")) { result = create_send_rtcp (rtpbin, templ, pad_name); + } else if (templ == gst_element_class_get_pad_template (klass, + "recv_fec_sink_%u_%u")) { + result = create_recv_fec (rtpbin, templ, pad_name); } else goto wrong_template; @@ -4743,13 +5025,16 @@ gst_rtp_bin_release_pad (GstElement * element, GstPad * pad) remove_send_rtp (rtpbin, session); } else if (session->send_rtcp_src_ghost == pad) { remove_rtcp (rtpbin, session); + } else if (pad_is_recv_fec (session, pad)) { + remove_recv_fec_for_pad (rtpbin, session, pad); } /* no more request pads, free the complete session */ if (session->recv_rtp_sink_ghost == NULL && session->recv_rtcp_sink_ghost == NULL && session->send_rtp_sink_ghost == NULL - && session->send_rtcp_src_ghost == NULL) { + && session->send_rtcp_src_ghost == NULL + && session->recv_fec_sink_ghosts == NULL) { GST_DEBUG_OBJECT (rtpbin, "no more pads for session %p", session); rtpbin->sessions = g_slist_remove (rtpbin->sessions, session); free_session (session, rtpbin); diff --git a/gst/rtpmanager/gstrtpbin.h b/gst/rtpmanager/gstrtpbin.h index fcea7ce..58de860 100644 --- a/gst/rtpmanager/gstrtpbin.h +++ b/gst/rtpmanager/gstrtpbin.h @@ -88,6 +88,9 @@ struct _GstRtpBin { /* the default SDES items for sessions */ GstStructure *sdes; + /* the default FEC decoder factories for sessions */ + GstStructure *fec_decoders; + /*< private >*/ GstRtpBinPrivate *priv; }; diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c index 4ba624f..91b6b65 100644 --- a/gst/rtpmanager/gstrtpmanager.c +++ b/gst/rtpmanager/gstrtpmanager.c @@ -32,6 +32,7 @@ #include "gstrtpdtmfmux.h" #include "gstrtpmux.h" #include "gstrtpfunnel.h" +#include "gstrtpst2022-1-fecdec.h" static gboolean plugin_init (GstPlugin * plugin) @@ -74,6 +75,10 @@ plugin_init (GstPlugin * plugin) GST_TYPE_RTP_FUNNEL)) return FALSE; + if (!gst_element_register (plugin, "rtpst2022-1-fecdec", GST_RANK_NONE, + GST_TYPE_RTPST_2022_1_FECDEC)) + return FALSE; + return TRUE; } diff --git a/gst/rtpmanager/gstrtpst2022-1-fecdec.c b/gst/rtpmanager/gstrtpst2022-1-fecdec.c new file mode 100644 index 0000000..7f45b53 --- /dev/null +++ b/gst/rtpmanager/gstrtpst2022-1-fecdec.c @@ -0,0 +1,995 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-rtpst2022-1-fecdec + * @see_also: #element-rtpst2022-1-fecenc + * + * This element takes as input a media stream and up to two FEC + * streams as described in SMPTE 2022-1: Forward Error Correction + * for Real-Time Video/Audio Transport Over IP Networks, and makes + * use of the FEC packets to recover media packets that may have + * gotten lost. + * + * ## Design + * + * The approach picked for this element is to proactively reconstruct missing + * packets as soon as possible. When a FEC packet arrives, the element + * immediately checks whether a media packet in the row / column it protects + * can be reconstructed. + * + * Similarly, when a media packet comes in, the element checks whether it has + * already received a corresponding packet in both the column and row the packet + * belongs to, and if so goes through the first step listed above. + * + * This process is repeated recursively, allowing for recoveries over one + * dimension to unblock recoveries over the other. + * + * In perfect networking conditions, this incurs next to no overhead as FEC + * packets will arrive after the media packets, causing no reconstruction to + * take place, just a few checks upon chaining. + * + * ## sender / receiver example + * + * ``` shell + * gst-launch-1.0 \ + * rtpbin name=rtp fec-encoders='fec,0="rtpst2022-1-fecenc\ rows\=5\ columns\=5";' \ + * uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \ + * queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! rtp.send_rtp_sink_0 \ + * rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \ + * rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false \ + * rtp.send_fec_src_0_1 ! udpsink host=127.0.0.1 port=5004 async=false + * ``` + * + * ``` shell + * gst-launch-1.0 \ + * rtpbin latency=500 fec-decoders='fec,0="rtpst2022-1-fecdec\ size-time\=1000000000";' name=rtp \ + * udpsrc address=127.0.0.1 port=5002 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_0 \ + * udpsrc address=127.0.0.1 port=5004 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_1 \ + * udpsrc address=127.0.0.1 port=5000 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \ + * queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \ + * rtp. ! decodebin ! videoconvert ! queue ! autovideosink + * ``` + * + * With the above command line, as the media packet size is constant, + * the fec overhead can be approximated to the number of fec packets + * per 2-d matrix of media packet, here 10 fec packets for each 25 + * media packets. + * + * Increasing the number of rows and columns will decrease the overhead, + * but obviously increase the likelihood of recovery failure for lost + * packets on the receiver side. + * + * Since: 1.20 + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +#include "gstrtpst2022-1-fecdec.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtpst_2022_1_fecdec_debug); +#define GST_CAT_DEFAULT gst_rtpst_2022_1_fecdec_debug + +#define DEFAULT_SIZE_TIME (GST_SECOND) + +typedef struct +{ + guint16 seq; + GstBuffer *buffer; +} Item; + +static GstFlowReturn store_media_item (GstRTPST_2022_1_FecDec * dec, + GstRTPBuffer * rtp, Item * item); + +static void +free_item (Item * item) +{ + gst_buffer_unref (item->buffer); + item->buffer = NULL; + g_free (item); +} + +static gint +cmp_items (Item * a, Item * b, gpointer unused) +{ + return gst_rtp_buffer_compare_seqnum (b->seq, a->seq); +} + +enum +{ + PROP_0, + PROP_SIZE_TIME, +}; + +struct _GstRTPST_2022_1_FecDecClass +{ + GstElementClass class; +}; + +struct _GstRTPST_2022_1_FecDec +{ + GstElement element; + + GstPad *srcpad; + GstPad *sinkpad; + GList *fec_sinkpads; + + /* All the following field are protected by the OBJECT_LOCK */ + GSequence *packets; + GHashTable *column_fec_packets; + GSequence *fec_packets[2]; + /* N columns */ + guint l; + /* N rows */ + guint d; + + GstClockTime size_time; + GstClockTime max_arrival_time; + GstClockTime max_fec_arrival_time[2]; +}; + +#define RTP_CAPS "application/x-rtp" + +typedef struct +{ + guint16 seq; + guint16 len; + guint8 E; + guint8 pt; + guint32 mask; + guint32 timestamp; + guint8 N; + guint8 D; + guint8 type; + guint8 index; + guint8 offset; + guint8 NA; + guint8 seq_ext; + guint8 *payload; + guint payload_len; +} Rtp2DFecHeader; + +static GstStaticPadTemplate fec_sink_template = +GST_STATIC_PAD_TEMPLATE ("fec_%u", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS (RTP_CAPS)); + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (RTP_CAPS)); + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (RTP_CAPS)); + +#define gst_rtpst_2022_1_fecdec_parent_class parent_class +G_DEFINE_TYPE (GstRTPST_2022_1_FecDec, gst_rtpst_2022_1_fecdec, + GST_TYPE_ELEMENT); + +static void +trim_items (GstRTPST_2022_1_FecDec * dec) +{ + GSequenceIter *tmp_iter, *iter = NULL; + + for (tmp_iter = g_sequence_get_begin_iter (dec->packets); + tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) { + Item *item; + + if (g_sequence_iter_is_end (tmp_iter)) + break; + + item = g_sequence_get (tmp_iter); + + if (dec->max_arrival_time - GST_BUFFER_DTS_OR_PTS (item->buffer) < + dec->size_time) + break; + + iter = tmp_iter; + } + + if (iter) { + Item *item = g_sequence_get (iter); + GST_TRACE_OBJECT (dec, + "Trimming packets up to %" GST_TIME_FORMAT " (seq: %u)", + GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq); + g_sequence_remove_range (g_sequence_get_begin_iter (dec->packets), iter); + } +} + +static void +trim_fec_items (GstRTPST_2022_1_FecDec * dec, guint D) +{ + GSequenceIter *tmp_iter, *iter = NULL; + + for (tmp_iter = g_sequence_get_begin_iter (dec->fec_packets[D]); + tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) { + Item *item; + + if (g_sequence_iter_is_end (tmp_iter)) + break; + + item = g_sequence_get (tmp_iter); + + if (dec->max_fec_arrival_time[D] - GST_BUFFER_DTS_OR_PTS (item->buffer) < + dec->size_time) + break; + + if (!D) { + guint i; + guint16 seq; + + for (i = 0; i < dec->d; i++) { + seq = item->seq + i * dec->l; + g_hash_table_remove (dec->column_fec_packets, GUINT_TO_POINTER (seq)); + } + } + + iter = tmp_iter; + } + + if (iter) { + Item *item = g_sequence_get (iter); + GST_TRACE_OBJECT (dec, + "Trimming %s FEC packets up to %" GST_TIME_FORMAT " (seq: %u)", + D ? "row" : "column", + GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq); + g_sequence_remove_range (g_sequence_get_begin_iter (dec->fec_packets[D]), + iter); + } +} + +static Item * +lookup_media_packet (GstRTPST_2022_1_FecDec * dec, guint16 seqnum) +{ + GSequenceIter *iter; + Item *ret = NULL; + Item dummy = { seqnum, NULL }; + + iter = + g_sequence_lookup (dec->packets, &dummy, (GCompareDataFunc) cmp_items, + NULL); + + if (iter) + ret = g_sequence_get (iter); + + return ret; +} + +static gboolean +parse_header (Rtp2DFecHeader * fec, guint8 * data, guint len) +{ + gboolean ret = FALSE; + GstBitReader bits; + + if (len < 16) + goto done; + + gst_bit_reader_init (&bits, data, len); + + fec->seq = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16); + fec->len = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16); + fec->E = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->pt = gst_bit_reader_get_bits_uint8_unchecked (&bits, 7); + fec->mask = gst_bit_reader_get_bits_uint32_unchecked (&bits, 24); + fec->timestamp = gst_bit_reader_get_bits_uint32_unchecked (&bits, 32); + fec->N = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->D = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1); + fec->type = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3); + fec->index = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3); + fec->offset = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->NA = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->seq_ext = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8); + fec->payload = data + 16; + fec->payload_len = len - 16; + + ret = TRUE; + +done: + return ret; +} + +static Item * +get_row_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum) +{ + GSequenceIter *iter; + Item *ret = NULL; + Item dummy = { 0, }; + + if (dec->l == G_MAXUINT) + goto done; + + /* Potential underflow is intended */ + dummy.seq = seqnum - dec->l; + + iter = + g_sequence_search (dec->fec_packets[1], &dummy, + (GCompareDataFunc) cmp_items, NULL); + + if (!g_sequence_iter_is_end (iter)) { + gint seqdiff; + ret = g_sequence_get (iter); + + seqdiff = gst_rtp_buffer_compare_seqnum (ret->seq, seqnum); + + /* Now check whether the fec packet does apply */ + if (seqdiff < 0 || seqdiff >= dec->l) + ret = NULL; + } + +done: + return ret; +} + +static Item * +get_column_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum) +{ + Item *ret = NULL; + + if (dec->l == G_MAXUINT || dec->d == G_MAXUINT) + goto done; + + ret = + g_hash_table_lookup (dec->column_fec_packets, GUINT_TO_POINTER (seqnum)); + +done: + return ret; +} + +static void +_xor_mem (guint8 * restrict dst, const guint8 * restrict src, gsize length) +{ + guint i; + + for (i = 0; i < (length / sizeof (guint64)); ++i) { +#if G_BYTE_ORDER == G_LITTLE_ENDIAN + GST_WRITE_UINT64_LE (dst, + GST_READ_UINT64_LE (dst) ^ GST_READ_UINT64_LE (src)); +#else + GST_WRITE_UINT64_BE (dst, + GST_READ_UINT64_BE (dst) ^ GST_READ_UINT64_BE (src)); +#endif + dst += sizeof (guint64); + src += sizeof (guint64); + } + for (i = 0; i < (length % sizeof (guint64)); ++i) + dst[i] ^= src[i]; +} + +static GstFlowReturn +xor_items (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec, GList * packets, + guint16 seqnum) +{ + guint8 *xored; + guint32 xored_timestamp; + guint8 xored_pt; + guint16 xored_payload_len; + Item *item; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + GList *tmp; + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buffer; + + /* Figure out the recovered packet length first */ + xored_payload_len = fec->len; + for (tmp = packets; tmp; tmp = tmp->next) { + GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT; + Item *item = (Item *) tmp->data; + + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp); + xored_payload_len ^= gst_rtp_buffer_get_payload_len (&media_rtp); + gst_rtp_buffer_unmap (&media_rtp); + } + + if (xored_payload_len > fec->payload_len) { + GST_WARNING_OBJECT (dec, "FEC payload len %u < length recovery %u", + fec->payload_len, xored_payload_len); + goto done; + } + + item = g_malloc0 (sizeof (Item)); + item->seq = seqnum; + item->buffer = gst_rtp_buffer_new_allocate (xored_payload_len, 0, 0); + gst_rtp_buffer_map (item->buffer, GST_MAP_WRITE, &rtp); + + xored = gst_rtp_buffer_get_payload (&rtp); + memcpy (xored, fec->payload, xored_payload_len); + xored_timestamp = fec->timestamp; + xored_pt = fec->pt; + + for (tmp = packets; tmp; tmp = tmp->next) { + GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT; + Item *item = (Item *) tmp->data; + + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp); + _xor_mem (xored, gst_rtp_buffer_get_payload (&media_rtp), + gst_rtp_buffer_get_payload_len (&media_rtp)); + xored_timestamp ^= gst_rtp_buffer_get_timestamp (&media_rtp); + xored_pt ^= gst_rtp_buffer_get_payload_type (&media_rtp); + + gst_rtp_buffer_unmap (&media_rtp); + } + + GST_DEBUG_OBJECT (dec, + "Recovered buffer through %s FEC with seqnum %u, payload len %u and timestamp %u", + fec->D ? "row" : "column", seqnum, xored_payload_len, xored_timestamp); + + GST_BUFFER_DTS (item->buffer) = dec->max_arrival_time; + + gst_rtp_buffer_set_timestamp (&rtp, xored_timestamp); + gst_rtp_buffer_set_seq (&rtp, seqnum); + gst_rtp_buffer_set_payload_type (&rtp, xored_pt); + + gst_rtp_buffer_unmap (&rtp); + + /* Store a ref on item->buffer as store_media_item may + * recurse and call this method again, potentially releasing + * the object lock and leaving our item unprotected in + * dec->packets + */ + buffer = gst_buffer_ref (item->buffer); + + /* It is right that we should celebrate, + * for your brother was dead, and is alive again */ + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp); + ret = store_media_item (dec, &rtp, item); + gst_rtp_buffer_unmap (&rtp); + + if (ret == GST_FLOW_OK) { + /* Unlocking here is safe */ + GST_OBJECT_UNLOCK (dec); + ret = gst_pad_push (dec->srcpad, buffer); + GST_OBJECT_LOCK (dec); + } else { + gst_buffer_unref (buffer); + } + +done: + return ret; +} + +/* Returns a flow value if we should discard the packet, GST_FLOW_CUSTOM_SUCCESS otherwise */ +static GstFlowReturn +check_fec (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec) +{ + GList *packets = NULL; + gint missing_seq = -1; + guint n_packets = 0; + guint required_n_packets; + GstFlowReturn ret = GST_FLOW_OK; + + if (fec->D) { + guint i = 0; + + required_n_packets = dec->l; + + for (i = 0; i < dec->l; i++) { + Item *item = lookup_media_packet (dec, fec->seq + i); + + if (item) { + packets = g_list_prepend (packets, item); + n_packets += 1; + } else { + missing_seq = fec->seq + i; + } + } + } else { + guint i = 0; + + required_n_packets = dec->d; + + for (i = 0; i < dec->d; i++) { + Item *item = lookup_media_packet (dec, fec->seq + i * dec->l); + + if (item) { + packets = g_list_prepend (packets, item); + n_packets += 1; + } else { + missing_seq = fec->seq + i * dec->l; + } + } + } + + if (n_packets == required_n_packets) { + g_assert (missing_seq == -1); + GST_LOG_OBJECT (dec, + "All media packets present, we can discard that FEC packet"); + } else if (n_packets + 1 == required_n_packets) { + g_assert (missing_seq != -1); + ret = xor_items (dec, fec, packets, missing_seq); + GST_LOG_OBJECT (dec, "We have enough info to reconstruct %u", missing_seq); + } else { + ret = GST_FLOW_CUSTOM_SUCCESS; + GST_LOG_OBJECT (dec, "Too many media packets missing, storing FEC packet"); + } + g_list_free (packets); + + return ret; +} + +static GstFlowReturn +check_fec_item (GstRTPST_2022_1_FecDec * dec, Item * item) +{ + Rtp2DFecHeader fec; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint payload_len; + guint8 *payload; + GstFlowReturn ret; + + gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp); + + payload_len = gst_rtp_buffer_get_payload_len (&rtp); + payload = gst_rtp_buffer_get_payload (&rtp); + + parse_header (&fec, payload, payload_len); + + ret = check_fec (dec, &fec); + + gst_rtp_buffer_unmap (&rtp); + + return ret; +} + +static GstFlowReturn +store_media_item (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp, Item * item) +{ + GstFlowReturn ret = GST_FLOW_OK; + Item *fec_item; + guint16 seq; + + seq = gst_rtp_buffer_get_seq (rtp); + + g_sequence_insert_sorted (dec->packets, item, (GCompareDataFunc) cmp_items, + NULL); + + if ((fec_item = get_row_fec (dec, seq))) { + ret = check_fec_item (dec, fec_item); + if (ret == GST_FLOW_CUSTOM_SUCCESS) + ret = GST_FLOW_OK; + } + + if (ret == GST_FLOW_OK && (fec_item = get_column_fec (dec, seq))) { + ret = check_fec_item (dec, fec_item); + if (ret == GST_FLOW_CUSTOM_SUCCESS) + ret = GST_FLOW_OK; + } + + return ret; +} + +static GstFlowReturn +store_media (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp, + GstBuffer * buffer) +{ + Item *item; + guint16 seq; + + seq = gst_rtp_buffer_get_seq (rtp); + item = g_malloc0 (sizeof (Item)); + item->buffer = gst_buffer_ref (buffer); + item->seq = seq; + + return store_media_item (dec, rtp, item); +} + +static GstFlowReturn +gst_rtpst_2022_1_fecdec_sink_chain_fec (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + Rtp2DFecHeader fec = { 0, }; + guint payload_len; + guint8 *payload; + GstFlowReturn ret = GST_FLOW_OK; + Item *item; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + GST_OBJECT_LOCK (dec); + + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) { + GST_WARNING_OBJECT (pad, "Chained FEC buffer isn't valid RTP"); + goto discard; + } + + payload_len = gst_rtp_buffer_get_payload_len (&rtp); + payload = gst_rtp_buffer_get_payload (&rtp); + + if (!parse_header (&fec, payload, payload_len)) { + GST_WARNING_OBJECT (pad, "Failed to parse FEC header (payload len: %d)", + payload_len); + GST_MEMDUMP_OBJECT (pad, "Invalid payload", payload, payload_len); + goto discard; + } + + GST_TRACE_OBJECT + (pad, + "Handling FEC buffer with SNBase / N / D / NA / offset %u / %u / %u / %u / %u", + fec.seq, fec.N, fec.D, fec.NA, fec.offset); + + if (fec.D) { + if (dec->l == G_MAXUINT) { + dec->l = fec.NA; + } else if (fec.NA != dec->l) { + GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change"); + goto discard; + } + + if (fec.offset != 1) { + GST_WARNING_OBJECT (pad, "offset must be 1 for row FEC packets"); + goto discard; + } + } else { + if (dec->d == G_MAXUINT) { + dec->d = fec.NA; + } else if (fec.NA != dec->d) { + GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change"); + goto discard; + } + + if (dec->l == G_MAXUINT) { + dec->l = fec.offset; + } else if (fec.offset != dec->l) { + GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change"); + goto discard; + } + } + + dec->max_fec_arrival_time[fec.D] = GST_BUFFER_DTS_OR_PTS (buffer); + trim_fec_items (dec, fec.D); + + ret = check_fec (dec, &fec); + + if (ret == GST_FLOW_CUSTOM_SUCCESS) { + item = g_malloc0 (sizeof (Item)); + item->buffer = buffer; + item->seq = fec.seq; + + if (!fec.D) { + guint i; + guint16 seq; + + for (i = 0; i < dec->d; i++) { + seq = fec.seq + i * dec->l; + g_hash_table_insert (dec->column_fec_packets, GUINT_TO_POINTER (seq), + item); + } + } + g_sequence_insert_sorted (dec->fec_packets[fec.D], item, + (GCompareDataFunc) cmp_items, NULL); + ret = GST_FLOW_OK; + } else { + goto discard; + } + + gst_rtp_buffer_unmap (&rtp); + +done: + GST_OBJECT_UNLOCK (dec); + return ret; + +discard: + if (rtp.buffer != NULL) + gst_rtp_buffer_unmap (&rtp); + + gst_buffer_unref (buffer); + + goto done; +} + +static GstFlowReturn +gst_rtpst_2022_1_fecdec_sink_chain (GstPad * pad, GstObject * parent, + GstBuffer * buffer) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + GstFlowReturn ret = GST_FLOW_OK; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) { + GST_WARNING_OBJECT (pad, "Chained buffer isn't valid RTP"); + goto error; + } + + GST_OBJECT_LOCK (dec); + dec->max_arrival_time = + MAX (dec->max_arrival_time, GST_BUFFER_DTS_OR_PTS (buffer)); + trim_items (dec); + ret = store_media (dec, &rtp, buffer); + GST_OBJECT_UNLOCK (dec); + + gst_rtp_buffer_unmap (&rtp); + + if (ret == GST_FLOW_OK) + ret = gst_pad_push (dec->srcpad, buffer); + +done: + return ret; + +error: + gst_buffer_unref (buffer); + goto done; +} + +static gboolean +gst_rtpst_2022_1_fecdec_src_event (GstPad * pad, GstObject * parent, + GstEvent * event) +{ + gboolean handled = FALSE; + gboolean ret = TRUE; + + if (!handled) { + gst_pad_event_default (pad, parent, event); + } + + return ret; +} + +/* Takes the object lock */ +static void +gst_rtpst_2022_1_fecdec_reset (GstRTPST_2022_1_FecDec * dec, gboolean allocate) +{ + guint i; + + GST_OBJECT_LOCK (dec); + + if (dec->packets) { + g_sequence_free (dec->packets); + dec->packets = NULL; + } + + if (dec->column_fec_packets) { + g_hash_table_unref (dec->column_fec_packets); + dec->column_fec_packets = NULL; + } + + if (allocate) { + dec->packets = g_sequence_new ((GDestroyNotify) free_item); + dec->column_fec_packets = g_hash_table_new (g_direct_hash, g_direct_equal); + } + + for (i = 0; i < 2; i++) { + if (dec->fec_packets[i]) { + g_sequence_free (dec->fec_packets[i]); + dec->fec_packets[i] = NULL; + } + + if (allocate) + dec->fec_packets[i] = g_sequence_new ((GDestroyNotify) free_item); + } + + dec->d = G_MAXUINT; + dec->l = G_MAXUINT; + + GST_OBJECT_UNLOCK (dec); +} + +static GstStateChangeReturn +gst_rtpst_2022_1_fecdec_change_state (GstElement * element, + GstStateChange transition) +{ + GstStateChangeReturn ret; + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element); + + switch (transition) { + case GST_STATE_CHANGE_READY_TO_PAUSED: + gst_rtpst_2022_1_fecdec_reset (dec, TRUE); + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_rtpst_2022_1_fecdec_reset (dec, FALSE); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + return ret; +} + +static void +gst_rtpst_2022_1_fecdec_finalize (GObject * object) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object); + + gst_rtpst_2022_1_fecdec_reset (dec, FALSE); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_rtpst_2022_1_fecdec_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object); + + switch (prop_id) { + case PROP_SIZE_TIME: + dec->size_time = g_value_get_uint64 (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtpst_2022_1_fecdec_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object); + + switch (prop_id) { + case PROP_SIZE_TIME: + g_value_set_uint64 (value, dec->size_time); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static gboolean +gst_2d_fec_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + gboolean ret; + + if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) + gst_rtpst_2022_1_fecdec_reset (dec, TRUE); + + ret = gst_pad_event_default (pad, parent, event); + + return ret; +} + +static GstIterator * +gst_rtpst_2022_1_fecdec_iterate_linked_pads (GstPad * pad, GstObject * parent) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent); + GstPad *otherpad = NULL; + GstIterator *it = NULL; + GValue val = { 0, }; + + if (pad == dec->srcpad) + otherpad = dec->sinkpad; + else if (pad == dec->sinkpad) + otherpad = dec->srcpad; + + if (otherpad) { + g_value_init (&val, GST_TYPE_PAD); + g_value_set_object (&val, otherpad); + it = gst_iterator_new_single (GST_TYPE_PAD, &val); + g_value_unset (&val); + } + + return it; +} + +static GstPad * +gst_rtpst_2022_1_fecdec_request_new_pad (GstElement * element, + GstPadTemplate * templ, const gchar * name, const GstCaps * caps) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element); + GstPad *sinkpad = NULL; + + GST_DEBUG_OBJECT (element, "requesting pad"); + + if (g_list_length (dec->fec_sinkpads) > 1) { + GST_ERROR_OBJECT (dec, "not accepting more than two fec streams"); + goto done; + } + + sinkpad = gst_pad_new_from_template (templ, name); + gst_pad_set_chain_function (sinkpad, gst_rtpst_2022_1_fecdec_sink_chain_fec); + gst_element_add_pad (GST_ELEMENT (dec), sinkpad); + gst_pad_set_iterate_internal_links_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads)); + + gst_pad_set_active (sinkpad, TRUE); + + GST_DEBUG_OBJECT (element, "requested pad %s:%s", + GST_DEBUG_PAD_NAME (sinkpad)); + +done: + return sinkpad; +} + +static void +gst_rtpst_2022_1_fecdec_release_pad (GstElement * element, GstPad * pad) +{ + GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element); + + GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + dec->fec_sinkpads = g_list_remove (dec->fec_sinkpads, pad); + + gst_pad_set_active (pad, FALSE); + gst_element_remove_pad (GST_ELEMENT_CAST (dec), pad); +} + +static void +gst_rtpst_2022_1_fecdec_class_init (GstRTPST_2022_1_FecDecClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->set_property = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_set_property); + gobject_class->get_property = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_get_property); + gobject_class->finalize = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_finalize); + + g_object_class_install_property (gobject_class, PROP_SIZE_TIME, + g_param_spec_uint64 ("size-time", "Storage size (in ns)", + "The amount of data to store (in ns, 0-disable)", 0, + G_MAXUINT64, DEFAULT_SIZE_TIME, + G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS)); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_change_state); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_release_pad); + + gst_element_class_set_static_metadata (gstelement_class, + "SMPTE 2022-1 FEC decoder", "SMPTE 2022-1 FEC decoding", + "performs FEC as described by SMPTE 2022-1", + "Mathieu Duponchelle "); + + gst_element_class_add_static_pad_template (gstelement_class, &sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &fec_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, &src_template); + + GST_DEBUG_CATEGORY_INIT (gst_rtpst_2022_1_fecdec_debug, + "rtpst2022-1-fecdec", 0, "SMPTE 2022-1 FEC decoder element"); +} + +static void +gst_rtpst_2022_1_fecdec_init (GstRTPST_2022_1_FecDec * dec) +{ + dec->srcpad = gst_pad_new_from_static_template (&src_template, "src"); + GST_PAD_SET_PROXY_CAPS (dec->srcpad); + gst_pad_use_fixed_caps (dec->srcpad); + gst_pad_set_event_function (dec->srcpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_src_event)); + gst_pad_set_iterate_internal_links_function (dec->srcpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (dec), dec->srcpad); + + dec->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink"); + GST_PAD_SET_PROXY_CAPS (dec->sinkpad); + gst_pad_set_chain_function (dec->sinkpad, gst_rtpst_2022_1_fecdec_sink_chain); + gst_pad_set_event_function (dec->sinkpad, + GST_DEBUG_FUNCPTR (gst_2d_fec_sink_event)); + gst_pad_set_iterate_internal_links_function (dec->sinkpad, + GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads)); + gst_element_add_pad (GST_ELEMENT (dec), dec->sinkpad); + + dec->d = G_MAXUINT; + dec->l = G_MAXUINT; +} diff --git a/gst/rtpmanager/gstrtpst2022-1-fecdec.h b/gst/rtpmanager/gstrtpst2022-1-fecdec.h new file mode 100644 index 0000000..104b157 --- /dev/null +++ b/gst/rtpmanager/gstrtpst2022-1-fecdec.h @@ -0,0 +1,37 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_RTPST_2022_1_FECDEC_H__ +#define __GST_RTPST_2022_1_FECDEC_H__ + +#include + +G_BEGIN_DECLS + +typedef struct _GstRTPST_2022_1_FecDecClass GstRTPST_2022_1_FecDecClass; +typedef struct _GstRTPST_2022_1_FecDec GstRTPST_2022_1_FecDec; + +#define GST_TYPE_RTPST_2022_1_FECDEC (gst_rtpst_2022_1_fecdec_get_type()) +#define GST_RTPST_2022_1_FECDEC_CAST(obj) ((GstRTPST_2022_1_FecDec *)(obj)) + +GType gst_rtpst_2022_1_fecdec_get_type (void); + +G_END_DECLS + +#endif /* __GST_RTPST_2022_1_FECDEC_H__ */ diff --git a/gst/rtpmanager/meson.build b/gst/rtpmanager/meson.build index 118a1e1..5cb6084 100644 --- a/gst/rtpmanager/meson.build +++ b/gst/rtpmanager/meson.build @@ -17,6 +17,7 @@ rtpmanager_sources = [ 'rtptwcc.c', 'gstrtpsession.c', 'gstrtpfunnel.c', + 'gstrtpst2022-1-fecdec.c' ] gstrtpmanager = library('gstrtpmanager', diff --git a/tests/check/elements/rtpst2022-1-fecdec.c b/tests/check/elements/rtpst2022-1-fecdec.c new file mode 100644 index 0000000..dd1ec45 --- /dev/null +++ b/tests/check/elements/rtpst2022-1-fecdec.c @@ -0,0 +1,444 @@ +/* GStreamer + * Copyright (C) <2020> Mathieu Duponchelle + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#include +#include +#include +#include + +static GstBuffer * +make_fec_sample (guint16 seq, guint32 ts, guint16 seq_base, gboolean row, + guint8 offset, guint8 NA, guint32 ts_recovery, guint8 * fec_payload, + guint fec_payload_len, guint16 length_recovery) +{ + GstBuffer *ret; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + GstBitWriter bits; + guint8 *data; + + ret = gst_rtp_buffer_new_allocate (16 + fec_payload_len, 0, 0); + + fail_unless (gst_rtp_buffer_map (ret, GST_MAP_WRITE, &rtp)); + + data = gst_rtp_buffer_get_payload (&rtp); + memset (data, 0x00, 16); + + gst_bit_writer_init_with_data (&bits, data, 17, FALSE); + + gst_bit_writer_put_bits_uint16 (&bits, seq_base, 16); /* SNBase low bits */ + gst_bit_writer_put_bits_uint16 (&bits, length_recovery, 16); /* Length Recovery */ + gst_bit_writer_put_bits_uint8 (&bits, 1, 1); /* E */ + gst_bit_writer_put_bits_uint8 (&bits, 0x21, 7); /* PT recovery */ + gst_bit_writer_put_bits_uint32 (&bits, 0, 24); /* Mask */ + gst_bit_writer_put_bits_uint32 (&bits, ts_recovery, 32); /* TS recovery */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 1); /* N */ + gst_bit_writer_put_bits_uint8 (&bits, row ? 1 : 0, 1); /* D */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 3); /* type */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 3); /* index */ + gst_bit_writer_put_bits_uint8 (&bits, offset, 8); /* Offset */ + gst_bit_writer_put_bits_uint8 (&bits, NA, 8); /* NA */ + gst_bit_writer_put_bits_uint8 (&bits, 0, 8); /* SNBase ext bits */ + + memcpy (data + 16, fec_payload, fec_payload_len); + + gst_bit_writer_reset (&bits); + + GST_MEMDUMP ("fec", data, 16 + fec_payload_len); + + gst_rtp_buffer_set_payload_type (&rtp, 96); + gst_rtp_buffer_set_seq (&rtp, seq); + gst_rtp_buffer_set_timestamp (&rtp, ts); + gst_rtp_buffer_unmap (&rtp); + + return ret; +} + +static GstBuffer * +make_media_sample (guint16 seq, guint32 ts, guint8 * payload, guint payload_len) +{ + GstBuffer *ret; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint8 *data; + + ret = gst_rtp_buffer_new_allocate (payload_len, 0, 0); + + gst_rtp_buffer_map (ret, GST_MAP_WRITE, &rtp); + gst_rtp_buffer_set_payload_type (&rtp, 33); + gst_rtp_buffer_set_seq (&rtp, seq); + gst_rtp_buffer_set_timestamp (&rtp, ts); + data = gst_rtp_buffer_get_payload (&rtp); + memcpy (data, payload, payload_len); + gst_rtp_buffer_unmap (&rtp); + + return ret; +} + +static void +pull_and_check (GstHarness * h, guint16 seq, guint32 ts, guint8 * payload, + guint payload_len, guint n_in_queue) +{ + GstBuffer *buffer; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + guint8 *data; + guint i; + + fail_unless_equals_int (gst_harness_buffers_in_queue (h), n_in_queue); + buffer = gst_harness_pull (h); + + fail_unless (gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)); + + fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), seq); + fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&rtp), ts); + fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp), 33); + fail_unless_equals_int (gst_rtp_buffer_get_payload_len (&rtp), payload_len); + data = gst_rtp_buffer_get_payload (&rtp); + + for (i = 0; i < payload_len; i++) + fail_unless_equals_int (data[i], payload[i]); + + gst_rtp_buffer_unmap (&rtp); + + gst_buffer_unref (buffer); +} + +/** + * +--------------+ + * | 9 | 10 | x | l1 + * | 12 | 13 | x | l2 + * | x | x | x | + * +--------------+ + * x x x + * + * Missing values: + * 11: 0xc5 + * 14: 0xb8 + */ +GST_START_TEST (test_row) +{ + guint8 payload; + GstHarness *h = + gst_harness_new_with_padnames ("rtpst2022-1-fecdec", NULL, "src"); + GstHarness *h0 = gst_harness_new_with_element (h->element, "sink", NULL); + GstHarness *h_fec_1 = + gst_harness_new_with_element (h->element, "fec_1", NULL); + + gst_harness_set_src_caps_str (h0, "application/x-rtp"); + gst_harness_set_src_caps_str (h_fec_1, "application/x-rtp"); + + payload = 0x37; + gst_harness_push (h0, make_media_sample (9, 0, &payload, 1)); + payload = 0x28; + gst_harness_push (h0, make_media_sample (10, 0, &payload, 1)); + payload = 0xff; + gst_harness_push (h0, make_media_sample (12, 0, &payload, 1)); + + /* We receive 9, 10 and 12 */ + fail_unless_equals_int (gst_harness_buffers_in_queue (h), 3); + while (gst_harness_buffers_in_queue (h)) { + gst_buffer_unref (gst_harness_pull (h)); + } + + payload = 0xda; + gst_harness_push (h_fec_1, make_fec_sample (0, 0, 9, TRUE, 1, 3, 0, &payload, + 1, 1)); + + /* After pushing l1, we should have enough info to reconstruct 11 */ + payload = 0xc5; + pull_and_check (h, 11, 0, &payload, 1, 1); + + /* Now we try to push l2 before 13, to verify that 14 is eventually + * reconstructed once 13 is pushed */ + payload = 0x02; + gst_harness_push (h_fec_1, make_fec_sample (1, 0, 12, TRUE, 1, 3, 0, &payload, + 1, 1)); + fail_unless_equals_int (gst_harness_buffers_in_queue (h), 0); + payload = 0x45; + gst_harness_push (h0, make_media_sample (13, 0, &payload, 1)); + fail_unless_equals_int (gst_harness_buffers_in_queue (h), 2); + payload = 0xb8; + pull_and_check (h, 14, 0, &payload, 1, 2); + payload = 0x45; + pull_and_check (h, 13, 0, &payload, 1, 1); + + gst_harness_teardown (h); + gst_harness_teardown (h0); + gst_harness_teardown (h_fec_1); +} + +GST_END_TEST; + +/** + * +--------------+ + * | 7 | 8 | x | x + * | 10 | 11 | x | x + * | x | x | x | + * +--------------+ + * d1 d2 x + * + * Missing values: + * 13: 0xc5 + * 14: 0x51 + */ +GST_START_TEST (test_column) +{ + guint8 payload; + GstHarness *h = + gst_harness_new_with_padnames ("rtpst2022-1-fecdec", NULL, "src"); + GstHarness *h0 = gst_harness_new_with_element (h->element, "sink", NULL); + GstHarness *h_fec_0 = + gst_harness_new_with_element (h->element, "fec_0", NULL); + + gst_harness_set_src_caps_str (h0, "application/x-rtp"); + gst_harness_set_src_caps_str (h_fec_0, "application/x-rtp"); + + payload = 0x37; + gst_harness_push (h0, make_media_sample (7, 0, &payload, 1)); + payload = 0x28; + gst_harness_push (h0, make_media_sample (10, 0, &payload, 1)); + + fail_unless_equals_int (gst_harness_buffers_in_queue (h), 2); + while (gst_harness_buffers_in_queue (h)) + gst_buffer_unref (gst_harness_pull (h)); + + payload = 0xda; + gst_harness_push (h_fec_0, make_fec_sample (0, 0, 7, FALSE, 3, 3, 0, &payload, + 1, 1)); + + /* After pushing d1, we should have enough info to reconstruct 13 */ + payload = 0xc5; + pull_and_check (h, 13, 0, &payload, 1, 1); + + /* Now we try to push d2 before 8 and 11, to verify that 14 is eventually + * reconstructed once 11 is pushed */ + payload = 0x04; + gst_harness_push (h_fec_0, make_fec_sample (1, 0, 8, FALSE, 3, 3, 0, &payload, + 1, 1)); + payload = 0x21; + gst_harness_push (h0, make_media_sample (8, 0, &payload, 1)); + + fail_unless_equals_int (gst_harness_buffers_in_queue (h), 1); + while (gst_harness_buffers_in_queue (h)) + gst_buffer_unref (gst_harness_pull (h)); + + payload = 0x74; + gst_harness_push (h0, make_media_sample (11, 0, &payload, 1)); + payload = 0x51; + pull_and_check (h, 14, 0, &payload, 1, 2); + payload = 0x74; + pull_and_check (h, 11, 0, &payload, 1, 1); + + gst_harness_teardown (h); + gst_harness_teardown (h0); + gst_harness_teardown (h_fec_0); +} + +GST_END_TEST; + + +/* + * +-----------+ + * | 0 | 1 | x | x + * | 3 | 4 | x | l1 + * | 6 | x | x | l2 + * +-----------+ + * d0 d1 d2 + * + * We should be able to retrieve 2 by retrieving 5 7 and 8 first. + * + * Missing values: + * 2: 0xfc + * 5: 0x3a + * 7: 0x5f + * 8: 0x21 + */ + +GST_START_TEST (test_2d) +{ + guint8 payload; + GstHarness *h = + gst_harness_new_with_padnames ("rtpst2022-1-fecdec", NULL, "src"); + GstHarness *h0 = gst_harness_new_with_element (h->element, "sink", NULL); + GstHarness *h_fec_0 = + gst_harness_new_with_element (h->element, "fec_0", NULL); + GstHarness *h_fec_1 = + gst_harness_new_with_element (h->element, "fec_1", NULL); + + gst_harness_set_src_caps_str (h0, "application/x-rtp"); + gst_harness_set_src_caps_str (h_fec_0, "application/x-rtp"); + gst_harness_set_src_caps_str (h_fec_1, "application/x-rtp"); + + payload = 0xde; + gst_harness_push (h0, make_media_sample (0, 0, &payload, 1)); + payload = 0xad; + gst_harness_push (h0, make_media_sample (1, 0, &payload, 1)); + payload = 0xbe; + gst_harness_push (h0, make_media_sample (3, 0, &payload, 1)); + payload = 0xef; + gst_harness_push (h0, make_media_sample (4, 0, &payload, 1)); + payload = 0x42; + gst_harness_push (h0, make_media_sample (6, 0, &payload, 1)); + + /* row FEC */ + /* l1 0xbe ^ 0xef ^ 0x3a */ + payload = 0x6b; + gst_harness_push (h_fec_1, make_fec_sample (0, 0, 3, TRUE, 1, 3, 0, &payload, + 1, 1)); + /* l2 0x42 ^ 0x5f ^ 0x21 */ + payload = 0x3c; + gst_harness_push (h_fec_1, make_fec_sample (0, 0, 6, TRUE, 1, 3, 0, &payload, + 1, 1)); + + /* column FEC */ + /* d0 0xde ^ 0xbe ^ 0x42 */ + payload = 0x22; + gst_harness_push (h_fec_0, make_fec_sample (0, 0, 0, FALSE, 3, 3, 0, &payload, + 1, 1)); + /* d1 0xad ^ 0xef ^ 0x5f */ + payload = 0x1d; + gst_harness_push (h_fec_0, make_fec_sample (1, 0, 1, FALSE, 3, 3, 0, &payload, + 1, 1)); + /* d2 0xfc ^ 0x3a ^ 0x21 */ + payload = 0xe7; + gst_harness_push (h_fec_0, make_fec_sample (2, 0, 2, FALSE, 3, 3, 0, &payload, + 1, 1)); + + /* We should retrieve all 9 packets despite dropping 4! */ + payload = 0xde; + pull_and_check (h, 0, 0, &payload, 1, 9); + payload = 0xad; + pull_and_check (h, 1, 0, &payload, 1, 8); + payload = 0xbe; + pull_and_check (h, 3, 0, &payload, 1, 7); + payload = 0xef; + pull_and_check (h, 4, 0, &payload, 1, 6); + payload = 0x42; + pull_and_check (h, 6, 0, &payload, 1, 5); + payload = 0x3a; + pull_and_check (h, 5, 0, &payload, 1, 4); + payload = 0x21; + pull_and_check (h, 8, 0, &payload, 1, 3); + payload = 0x5f; + pull_and_check (h, 7, 0, &payload, 1, 2); + payload = 0xfc; + pull_and_check (h, 2, 0, &payload, 1, 1); + + gst_harness_teardown (h); + gst_harness_teardown (h0); + gst_harness_teardown (h_fec_0); + gst_harness_teardown (h_fec_1); +} + +GST_END_TEST; + +static void +_xor_mem (guint8 * restrict dst, const guint8 * restrict src, gsize length) +{ + guint i; + + for (i = 0; i < (length / sizeof (guint64)); ++i) { +#if G_BYTE_ORDER == G_LITTLE_ENDIAN + GST_WRITE_UINT64_LE (dst, + GST_READ_UINT64_LE (dst) ^ GST_READ_UINT64_LE (src)); +#else + GST_WRITE_UINT64_BE (dst, + GST_READ_UINT64_BE (dst) ^ GST_READ_UINT64_BE (src)); +#endif + dst += sizeof (guint64); + src += sizeof (guint64); + } + for (i = 0; i < (length % sizeof (guint64)); ++i) + dst[i] ^= src[i]; +} + +/** + * +-----------------+ + * | 0-1 | 1-3 | x-4 | l1 + * +-----------------+ + * x x x + * + * Missing values: + * 2: 0xc5b74108 + */ +GST_START_TEST (test_variable_length) +{ + guint8 payload[4]; + guint8 fec_payload[4]; + GstHarness *h = + gst_harness_new_with_padnames ("rtpst2022-1-fecdec", NULL, "src"); + GstHarness *h0 = gst_harness_new_with_element (h->element, "sink", NULL); + GstHarness *h_fec_1 = + gst_harness_new_with_element (h->element, "fec_1", NULL); + + gst_harness_set_src_caps_str (h0, "application/x-rtp"); + gst_harness_set_src_caps_str (h_fec_1, "application/x-rtp"); + + memset (fec_payload, 0x00, 4); + + payload[0] = 0x37; + _xor_mem (fec_payload, payload, 1); + gst_harness_push (h0, make_media_sample (0, 0, payload, 1)); + + payload[0] = 0x28; + payload[1] = 0x39; + payload[2] = 0x56; + _xor_mem (fec_payload, payload, 3); + gst_harness_push (h0, make_media_sample (1, 0, payload, 3)); + + /* We receive 0 and 1 */ + fail_unless_equals_int (gst_harness_buffers_in_queue (h), 2); + while (gst_harness_buffers_in_queue (h)) { + gst_buffer_unref (gst_harness_pull (h)); + } + + payload[0] = 0xc5; + payload[1] = 0xb7; + payload[2] = 0x41; + payload[3] = 0x08; + + _xor_mem (fec_payload, payload, 4); + gst_harness_push (h_fec_1, make_fec_sample (0, 0, 0, TRUE, 1, 3, 0, + fec_payload, 4, 1 ^ 3 ^ 4)); + + pull_and_check (h, 2, 0, payload, 4, 1); + + gst_harness_teardown (h); + gst_harness_teardown (h0); + gst_harness_teardown (h_fec_1); +} + +GST_END_TEST; + + +static Suite * +st2022_1_dec_suite (void) +{ + Suite *s = suite_create ("rtpst2022-1-fecdec"); + TCase *tc_chain = tcase_create ("general"); + + suite_add_tcase (s, tc_chain); + + tcase_add_test (tc_chain, test_row); + tcase_add_test (tc_chain, test_column); + tcase_add_test (tc_chain, test_2d); + tcase_add_test (tc_chain, test_variable_length); + + return s; +} + +GST_CHECK_MAIN (st2022_1_dec) diff --git a/tests/check/meson.build b/tests/check/meson.build index 1742946..6f8dd9c 100644 --- a/tests/check/meson.build +++ b/tests/check/meson.build @@ -88,6 +88,7 @@ good_tests = [ [ 'elements/rtpulpfec' ], [ 'elements/rtpssrcdemux' ], [ 'elements/rtp-payloading' ], + [ 'elements/rtpst2022-1-fecdec' ], [ 'elements/spectrum', false, [gstfft_dep] ], [ 'elements/shapewipe' ], [ 'elements/udpsink' ], -- 2.7.4