1 /* GStreamer plugin for forward error correction
2 * Copyright (C) 2017 Pexip
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 * Author: Mikhail Fludkov <misha@pexip.com>
22 * SECTION:element-rtpreddec
23 * @short_description: RTP Redundant Audio Data (RED) decoder
26 * Decode Redundant Audio Data (RED) as per RFC 2198.
28 * This element is mostly provided for chrome webrtc compatibility:
29 * chrome will wrap ulpfec-protected streams in RED packets, and such
30 * streams need to be unwrapped by this element before being passed on
31 * to #GstRtpUlpFecDec.
33 * The #GstRtpRedDec:pt property should be set to the expected payload
34 * types of the RED packets.
36 * When using #GstRtpBin, this element should be inserted through the
37 * #GstRtpBin::request-aux-receiver signal.
42 * gst-launch-1.0 udpsrc port=8888 caps="application/x-rtp, payload=96, clock-rate=90000" ! rtpreddec pt=122 ! rtpstorage size-time=220000000 ! rtpssrcdemux ! application/x-rtp, payload=96, clock-rate=90000, media=video, encoding-name=H264 ! rtpjitterbuffer do-lost=1 latency=200 ! rtpulpfecdec pt=122 ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink
43 * ]| This example will receive a stream with RED and ULP FEC and try to reconstruct the packets.
45 * See also: #GstRtpRedEnc, #GstWebRTCBin, #GstRtpBin
49 #include <gst/rtp/gstrtpbuffer.h>
51 #include "gstrtpelements.h"
52 #include "rtpredcommon.h"
53 #include "gstrtpreddec.h"
54 #include "rtpulpfeccommon.h"
56 #define RTP_HISTORY_MAX_SIZE (16)
64 #define RTP_HIST_ITEM_TIMESTAMP(p) ((RTPHistItem *)p)->timestamp
65 #define RTP_HIST_ITEM_SEQ(p) ((RTPHistItem *)p)->seq
67 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
70 GST_STATIC_CAPS ("application/x-rtp"));
72 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
75 GST_STATIC_CAPS ("application/x-rtp"));
78 #define MIN_PT UNDEF_PT
80 #define DEFAULT_PT UNDEF_PT
82 GST_DEBUG_CATEGORY_STATIC (gst_rtp_red_dec_debug);
83 #define GST_CAT_DEFAULT gst_rtp_red_dec_debug
85 G_DEFINE_TYPE (GstRtpRedDec, gst_rtp_red_dec, GST_TYPE_ELEMENT);
86 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtpreddec, "rtpreddec", GST_RANK_NONE,
87 GST_TYPE_RTP_RED_DEC, rtp_element_init (plugin));
98 rtp_hist_item_alloc (void)
100 return g_slice_new (RTPHistItem);
104 rtp_hist_item_free (gpointer item)
106 g_slice_free (RTPHistItem, item);
110 gst_rtp_red_history_find_less_or_equal (gconstpointer item,
111 gconstpointer timestamp)
113 guint32 t = GPOINTER_TO_UINT (timestamp);
114 gint32 diff = t - RTP_HIST_ITEM_TIMESTAMP (item);
119 gst_rtp_red_history_find_less (gconstpointer item, gconstpointer timestamp)
121 guint32 t = GPOINTER_TO_UINT (timestamp);
122 gint32 diff = t - RTP_HIST_ITEM_TIMESTAMP (item);
127 gst_rtp_red_history_update (GstRtpRedDec * self, GQueue * rtp_history,
131 GList *link, *sibling;
133 /* If we have not reached MAX number of elements in the history,
134 * allocate a new link and a new item,
135 * otherwise reuse the tail (the oldest data) without any reallocations
137 if (rtp_history->length < RTP_HISTORY_MAX_SIZE) {
138 item = rtp_hist_item_alloc ();
139 link = g_list_alloc ();
142 link = g_queue_pop_tail_link (rtp_history);
146 item->timestamp = gst_rtp_buffer_get_timestamp (rtp);
147 item->seq = gst_rtp_buffer_get_seq (rtp);
149 /* Looking for a place to insert new link.
150 * The queue has newest to oldest rtp timestamps, so in 99% cases
151 * it is inserted before the head of the queue */
152 sibling = g_list_find_custom (rtp_history->head,
153 GUINT_TO_POINTER (item->timestamp),
154 gst_rtp_red_history_find_less_or_equal);
155 g_queue_push_nth_link (rtp_history,
156 g_list_position (rtp_history->head, sibling), link);
160 rtp_red_buffer_is_valid (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
161 gsize * dst_first_red_payload_offset)
163 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
164 gsize payload_len = gst_rtp_buffer_get_payload_len (red_rtp);
165 gsize red_hdrs_offset = 0;
166 guint red_hdrs_checked = 0;
167 guint redundant_payload_len = 0;
170 gpointer red_hdr = payload + red_hdrs_offset;
172 gboolean is_redundant;
176 /* Can we address the first byte where F bit is located ? */
177 if (red_hdrs_offset + 1 > payload_len)
178 goto red_buffer_invalid;
180 is_redundant = rtp_red_block_is_redundant (red_hdr);
182 /* Is it the last block? */
184 red_hdr_len = rtp_red_block_header_get_length (TRUE);
186 /* Can we address all the other bytes in RED block header? */
187 if (red_hdrs_offset + red_hdr_len > payload_len)
188 goto red_buffer_invalid;
190 redundant_payload_len += rtp_red_block_get_payload_length (red_hdr);
191 red_hdrs_offset += red_hdr_len;
193 red_hdr_len = rtp_red_block_header_get_length (FALSE);
194 red_hdrs_offset += red_hdr_len;
199 /* Do we have enough data to create redundant packets & main packet. Keep in
200 * mind that redundant_payload_len contains the length of redundant packets only.
202 if (red_hdrs_offset + redundant_payload_len >= payload_len)
203 goto red_buffer_invalid;
205 *dst_first_red_payload_offset = red_hdrs_offset;
207 GST_LOG_OBJECT (self, "RED packet has %u blocks", red_hdrs_checked);
211 GST_WARNING_OBJECT (self, "Received invalid RED packet "
212 "ssrc=0x%08x pt=%u tstamp=%u seq=%u size=%u, "
214 gst_rtp_buffer_get_ssrc (red_rtp),
215 gst_rtp_buffer_get_payload_type (red_rtp),
216 gst_rtp_buffer_get_timestamp (red_rtp),
217 gst_rtp_buffer_get_seq (red_rtp),
218 gst_rtp_buffer_get_packet_len (red_rtp), red_hdrs_checked);
223 gst_red_history_lost_seq_num_for_timestamp (GstRtpRedDec * self,
224 GQueue * rtp_history, guint32 timestamp, guint16 * dst_seq_num)
226 GList *older_sibling = g_list_find_custom (rtp_history->head,
227 GUINT_TO_POINTER (timestamp),
228 gst_rtp_red_history_find_less);
231 guint32 timestamp_diff;
232 gint seq_diff, lost_packet_idx;
234 if (NULL == older_sibling) {
235 if (rtp_history->length == RTP_HISTORY_MAX_SIZE)
236 GST_WARNING_OBJECT (self, "History is too short. "
237 "Oldest rtp timestamp %u, looking for %u, size %u",
238 RTP_HIST_ITEM_TIMESTAMP (rtp_history->tail->data),
239 timestamp, rtp_history->length);
243 if (NULL == older_sibling->prev) {
244 GST_WARNING_OBJECT (self, "RED block timestamp offset probably wrong. "
245 "Latest rtp timestamp %u, looking for %u, size %u",
246 RTP_HIST_ITEM_TIMESTAMP (rtp_history->head->data),
247 timestamp, rtp_history->length);
251 older = older_sibling->data;
252 newer = older_sibling->prev->data;
253 /* We know for sure @older has lower timestamp than we are looking for,
254 * if @newer has the same timestamp, there is no packet loss and we
255 * don't need to use redundant data */
256 if (newer->timestamp == timestamp)
259 seq_diff = gst_rtp_buffer_compare_seqnum (older->seq, newer->seq);
262 GST_WARNING_OBJECT (self, "RED block timestamp offset is wrong: "
263 "#%u,%u #%u,%u looking for %u",
264 older->seq, older->timestamp,
265 newer->seq, newer->timestamp, timestamp);
267 GST_WARNING_OBJECT (self, "RTP timestamps increasing while "
268 "sequence numbers decreasing: #%u,%u #%u,%u",
269 older->seq, older->timestamp, newer->seq, newer->timestamp);
273 timestamp_diff = newer->timestamp - older->timestamp;
274 for (lost_packet_idx = 1; lost_packet_idx < seq_diff; ++lost_packet_idx) {
275 guint32 lost_timestamp = older->timestamp +
276 lost_packet_idx * timestamp_diff / seq_diff;
277 if (lost_timestamp == timestamp) {
278 *dst_seq_num = older->seq + lost_packet_idx;
283 GST_WARNING_OBJECT (self, "Can't find RED block timestamp "
284 "#%u,%u #%u,%u looking for %u",
285 older->seq, older->timestamp, newer->seq, newer->timestamp, timestamp);
290 gst_rtp_red_create_packet (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
291 gboolean marker, guint8 pt, guint16 seq_num, guint32 timestamp,
292 gsize red_payload_subbuffer_start, gsize red_payload_subbuffer_len)
294 guint csrc_count = gst_rtp_buffer_get_csrc_count (red_rtp);
295 GstBuffer *ret = gst_rtp_buffer_new_allocate (0, 0, csrc_count);
296 GstRTPBuffer ret_rtp = GST_RTP_BUFFER_INIT;
298 if (!gst_rtp_buffer_map (ret, GST_MAP_WRITE, &ret_rtp))
299 g_assert_not_reached ();
301 gst_rtp_buffer_set_marker (&ret_rtp, marker);
302 gst_rtp_buffer_set_payload_type (&ret_rtp, pt);
303 gst_rtp_buffer_set_seq (&ret_rtp, seq_num);
304 gst_rtp_buffer_set_timestamp (&ret_rtp, timestamp);
305 gst_rtp_buffer_set_ssrc (&ret_rtp, gst_rtp_buffer_get_ssrc (red_rtp));
306 for (i = 0; i < csrc_count; ++i)
307 gst_rtp_buffer_set_csrc (&ret_rtp, i, gst_rtp_buffer_get_csrc (red_rtp, i));
308 gst_rtp_buffer_unmap (&ret_rtp);
310 ret = gst_buffer_append (ret,
311 gst_rtp_buffer_get_payload_subbuffer (red_rtp,
312 red_payload_subbuffer_start, red_payload_subbuffer_len));
314 /* Timestamps, meta, flags from the RED packet should go to main block packet */
315 gst_buffer_copy_into (ret, red_rtp->buffer, GST_BUFFER_COPY_METADATA, 0, -1);
317 GST_BUFFER_FLAG_SET (ret, GST_BUFFER_FLAG_MARKER);
322 gst_rtp_red_create_from_redundant_block (GstRtpRedDec * self,
323 GQueue * rtp_history, GstRTPBuffer * red_rtp, gsize * red_hdr_offset,
324 gsize * red_payload_offset)
326 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
327 guint8 *red_hdr = payload + *red_hdr_offset;
328 guint32 lost_timestamp = gst_rtp_buffer_get_timestamp (red_rtp) -
329 rtp_red_block_get_timestamp_offset (red_hdr);
331 GstBuffer *ret = NULL;
332 guint16 lost_seq = 0;
333 if (gst_red_history_lost_seq_num_for_timestamp (self, rtp_history,
334 lost_timestamp, &lost_seq)) {
335 GST_LOG_OBJECT (self,
336 "Recovering from RED packet pt=%u ts=%u seq=%u" " len=%u present",
337 rtp_red_block_get_payload_type (red_hdr), lost_timestamp, lost_seq,
338 rtp_red_block_get_payload_length (red_hdr));
340 gst_rtp_red_create_packet (self, red_rtp, FALSE,
341 rtp_red_block_get_payload_type (red_hdr), lost_seq, lost_timestamp,
342 *red_payload_offset, rtp_red_block_get_payload_length (red_hdr));
343 GST_BUFFER_FLAG_SET (ret, GST_RTP_BUFFER_FLAG_REDUNDANT);
345 GST_LOG_OBJECT (self, "Ignore RED packet pt=%u ts=%u len=%u because already"
346 " present", rtp_red_block_get_payload_type (red_hdr), lost_timestamp,
347 rtp_red_block_get_payload_length (red_hdr));
350 *red_hdr_offset += rtp_red_block_header_get_length (TRUE);
351 *red_payload_offset += rtp_red_block_get_payload_length (red_hdr);
356 gst_rtp_red_create_from_main_block (GstRtpRedDec * self,
357 GstRTPBuffer * red_rtp, gsize red_hdr_offset, gsize * red_payload_offset)
359 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
360 GstBuffer *ret = gst_rtp_red_create_packet (self, red_rtp,
361 gst_rtp_buffer_get_marker (red_rtp),
362 rtp_red_block_get_payload_type (payload + red_hdr_offset),
363 gst_rtp_buffer_get_seq (red_rtp),
364 gst_rtp_buffer_get_timestamp (red_rtp),
365 *red_payload_offset, -1);
366 *red_payload_offset = gst_rtp_buffer_get_payload_len (red_rtp);
367 GST_LOG_OBJECT (self, "Extracting main payload from RED pt=%u seq=%u ts=%u"
368 " marker=%u", rtp_red_block_get_payload_type (payload + red_hdr_offset),
369 gst_rtp_buffer_get_seq (red_rtp), gst_rtp_buffer_get_timestamp (red_rtp),
370 gst_rtp_buffer_get_marker (red_rtp));
376 gst_rtp_red_create_from_block (GstRtpRedDec * self, GQueue * rtp_history,
377 GstRTPBuffer * red_rtp, gsize * red_hdr_offset, gsize * red_payload_offset)
379 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
381 if (rtp_red_block_is_redundant (payload + (*red_hdr_offset)))
382 return gst_rtp_red_create_from_redundant_block (self, rtp_history, red_rtp,
383 red_hdr_offset, red_payload_offset);
385 return gst_rtp_red_create_from_main_block (self, red_rtp, *red_hdr_offset,
390 gst_rtp_red_process (GstRtpRedDec * self, GQueue * rtp_history,
391 GstRTPBuffer * red_rtp, gsize first_red_payload_offset)
393 gsize red_hdr_offset = 0;
394 gsize red_payload_offset = first_red_payload_offset;
395 gsize payload_len = gst_rtp_buffer_get_payload_len (red_rtp);
396 GstFlowReturn ret = GST_FLOW_OK;
399 GstBuffer *buf = gst_rtp_red_create_from_block (self, rtp_history, red_rtp,
401 &red_payload_offset);
403 ret = gst_pad_push (self->srcpad, buf);
404 } while (GST_FLOW_OK == ret && red_payload_offset < payload_len);
410 is_red_pt (GstRtpRedDec * self, guint8 pt)
414 g_mutex_lock (&self->lock);
415 if (pt == self->pt) {
421 && g_hash_table_contains (self->payloads, GINT_TO_POINTER (pt));
424 g_mutex_unlock (&self->lock);
429 gst_rtp_red_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
431 GstRtpRedDec *self = GST_RTP_RED_DEC (parent);
432 GstRTPBuffer irtp = GST_RTP_BUFFER_INIT;
433 GstFlowReturn ret = GST_FLOW_OK;
434 gsize first_red_payload_offset = 0;
438 if (self->pt == UNDEF_PT && self->payloads == NULL)
439 return gst_pad_push (self->srcpad, buffer);
441 if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &irtp))
442 return gst_pad_push (self->srcpad, buffer);
444 ssrc = gst_rtp_buffer_get_ssrc (&irtp);
447 g_hash_table_lookup (self->rtp_histories, GUINT_TO_POINTER (ssrc)))) {
448 rtp_history = g_queue_new ();
449 g_hash_table_insert (self->rtp_histories, GUINT_TO_POINTER (ssrc),
453 gst_rtp_red_history_update (self, rtp_history, &irtp);
455 if (!is_red_pt (self, gst_rtp_buffer_get_payload_type (&irtp))) {
456 GST_LOG_RTP_PACKET (self, "rtp header (incoming)", &irtp);
458 gst_rtp_buffer_unmap (&irtp);
459 return gst_pad_push (self->srcpad, buffer);
462 self->num_received++;
464 if (rtp_red_buffer_is_valid (self, &irtp, &first_red_payload_offset)) {
465 GST_DEBUG_RTP_PACKET (self, "rtp header (red)", &irtp);
467 gst_rtp_red_process (self, rtp_history, &irtp,
468 first_red_payload_offset);
471 gst_rtp_buffer_unmap (&irtp);
472 gst_buffer_unref (buffer);
477 gst_rtp_red_dec_dispose (GObject * obj)
479 GstRtpRedDec *self = GST_RTP_RED_DEC (obj);
481 g_hash_table_unref (self->rtp_histories);
483 if (self->payloads) {
484 g_hash_table_unref (self->payloads);
487 g_mutex_clear (&self->lock);
489 G_OBJECT_CLASS (gst_rtp_red_dec_parent_class)->dispose (obj);
493 free_rtp_history (GQueue * rtp_history)
495 g_queue_free_full (rtp_history, rtp_hist_item_free);
499 gst_rtp_red_dec_init (GstRtpRedDec * self)
501 GstPadTemplate *pad_template;
504 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (self), "src");
505 self->srcpad = gst_pad_new_from_template (pad_template, "src");
506 gst_element_add_pad (GST_ELEMENT_CAST (self), self->srcpad);
509 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (self), "sink");
510 self->sinkpad = gst_pad_new_from_template (pad_template, "sink");
511 gst_pad_set_chain_function (self->sinkpad,
512 GST_DEBUG_FUNCPTR (gst_rtp_red_dec_chain));
513 GST_PAD_SET_PROXY_CAPS (self->sinkpad);
514 GST_PAD_SET_PROXY_ALLOCATION (self->sinkpad);
515 gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
517 self->pt = DEFAULT_PT;
518 self->num_received = 0;
519 self->rtp_histories =
520 g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
521 (GDestroyNotify) free_rtp_history);
522 self->payloads = NULL;
523 g_mutex_init (&self->lock);
527 gst_rtp_red_dec_set_property (GObject * object, guint prop_id,
528 const GValue * value, GParamSpec * pspec)
530 GstRtpRedDec *self = GST_RTP_RED_DEC (object);
534 g_mutex_lock (&self->lock);
535 self->pt = g_value_get_int (value);
536 g_mutex_unlock (&self->lock);
542 g_mutex_lock (&self->lock);
543 if (self->payloads) {
544 g_hash_table_unref (self->payloads);
545 self->payloads = NULL;
548 n_vals = gst_value_array_get_size (value);
551 self->payloads = g_hash_table_new (g_direct_hash, g_direct_equal);
553 for (i = 0; i < gst_value_array_get_size (value); i++) {
554 const GValue *val = gst_value_array_get_value (value, i);
556 g_hash_table_insert (self->payloads,
557 GINT_TO_POINTER (g_value_get_int (val)), NULL);
560 g_mutex_unlock (&self->lock);
564 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
570 append_payload (gpointer key, gpointer value, GValue * array)
573 g_value_init (&v, G_TYPE_INT);
574 g_value_set_int (&v, GPOINTER_TO_INT (key));
575 gst_value_array_append_value (array, &v);
580 gst_rtp_red_dec_get_property (GObject * object, guint prop_id,
581 GValue * value, GParamSpec * pspec)
583 GstRtpRedDec *self = GST_RTP_RED_DEC (object);
586 g_mutex_lock (&self->lock);
587 g_value_set_int (value, self->pt);
588 g_mutex_unlock (&self->lock);
591 g_value_set_uint (value, self->num_received);
595 g_mutex_lock (&self->lock);
596 if (self->payloads) {
597 g_hash_table_foreach (self->payloads, (GHFunc) append_payload, value);
599 g_mutex_unlock (&self->lock);
603 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
609 gst_rtp_red_dec_class_init (GstRtpRedDecClass * klass)
611 GObjectClass *gobject_class;
612 GstElementClass *element_class;
614 gobject_class = G_OBJECT_CLASS (klass);
615 element_class = GST_ELEMENT_CLASS (klass);
617 gst_element_class_add_pad_template (element_class,
618 gst_static_pad_template_get (&src_template));
619 gst_element_class_add_pad_template (element_class,
620 gst_static_pad_template_get (&sink_template));
622 gst_element_class_set_metadata (element_class,
623 "Redundant Audio Data (RED) Decoder",
624 "Codec/Depayloader/Network/RTP",
625 "Decode Redundant Audio Data (RED)",
626 "Hani Mustafa <hani@pexip.com>, Mikhail Fludkov <misha@pexip.com>");
628 gobject_class->set_property =
629 GST_DEBUG_FUNCPTR (gst_rtp_red_dec_set_property);
630 gobject_class->get_property =
631 GST_DEBUG_FUNCPTR (gst_rtp_red_dec_get_property);
632 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_red_dec_dispose);
634 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PT,
635 g_param_spec_int ("pt", "payload type",
636 "Payload type FEC packets",
637 MIN_PT, MAX_PT, DEFAULT_PT,
638 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
640 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_RECEIVED,
641 g_param_spec_uint ("received", "Received",
642 "Count of received packets",
643 0, G_MAXUINT32, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
646 * rtpreddec:payloads:
648 * All the RED payloads this decoder may encounter
652 g_object_class_install_property (G_OBJECT_CLASS (klass),
654 gst_param_spec_array ("payloads",
656 "All the RED payloads this decoder may encounter",
657 g_param_spec_int ("pt",
659 "A RED payload type",
662 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS),
663 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)
666 GST_DEBUG_CATEGORY_INIT (gst_rtp_red_dec_debug, "rtpreddec", 0,