1 /* GStreamer plugin for forward error correction
2 * Copyright (C) 2017 Pexip
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 * Author: Mikhail Fludkov <misha@pexip.com>
21 #include <gst/rtp/gstrtpbuffer.h>
23 #include "rtpredcommon.h"
24 #include "gstrtpreddec.h"
25 #include "rtpulpfeccommon.h"
27 #define RTP_HISTORY_MAX_SIZE (16)
35 #define RTP_HIST_ITEM_TIMESTAMP(p) ((RTPHistItem *)p)->timestamp
36 #define RTP_HIST_ITEM_SEQ(p) ((RTPHistItem *)p)->seq
38 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
41 GST_STATIC_CAPS ("application/x-rtp"));
43 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
46 GST_STATIC_CAPS ("application/x-rtp"));
49 #define MIN_PT UNDEF_PT
51 #define DEFAULT_PT UNDEF_PT
53 GST_DEBUG_CATEGORY_STATIC (gst_rtp_red_dec_debug);
54 #define GST_CAT_DEFAULT gst_rtp_red_dec_debug
56 G_DEFINE_TYPE (GstRtpRedDec, gst_rtp_red_dec, GST_TYPE_ELEMENT);
66 rtp_hist_item_alloc (void)
68 return g_slice_new (RTPHistItem);
72 rtp_hist_item_free (gpointer item)
74 g_slice_free (RTPHistItem, item);
78 gst_rtp_red_history_find_less_or_equal (gconstpointer item,
79 gconstpointer timestamp)
81 guint32 t = GPOINTER_TO_UINT (timestamp);
82 gint32 diff = t - RTP_HIST_ITEM_TIMESTAMP (item);
87 gst_rtp_red_history_find_less (gconstpointer item, gconstpointer timestamp)
89 guint32 t = GPOINTER_TO_UINT (timestamp);
90 gint32 diff = t - RTP_HIST_ITEM_TIMESTAMP (item);
95 gst_rtp_red_history_update (GstRtpRedDec * self, GstRTPBuffer * rtp)
98 GList *link, *sibling;
100 /* If we have not reached MAX number of elements in the history,
101 * allocate a new link and a new item,
102 * otherwise reuse the tail (the oldest data) without any reallocations
104 if (self->rtp_history->length < RTP_HISTORY_MAX_SIZE) {
105 item = rtp_hist_item_alloc ();
106 link = g_list_alloc ();
109 link = g_queue_pop_tail_link (self->rtp_history);
113 item->timestamp = gst_rtp_buffer_get_timestamp (rtp);
114 item->seq = gst_rtp_buffer_get_seq (rtp);
116 /* Looking for a place to insert new link.
117 * The queue has newest to oldest rtp timestamps, so in 99% cases
118 * it is inserted before the head of the queue */
119 sibling = g_list_find_custom (self->rtp_history->head,
120 GUINT_TO_POINTER (item->timestamp),
121 gst_rtp_red_history_find_less_or_equal);
122 g_queue_push_nth_link (self->rtp_history,
123 g_list_position (self->rtp_history->head, sibling), link);
127 rtp_red_buffer_is_valid (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
128 gsize * dst_first_red_payload_offset)
130 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
131 gsize payload_len = gst_rtp_buffer_get_payload_len (red_rtp);
132 gsize red_hdrs_offset = 0;
133 guint red_hdrs_checked = 0;
134 guint redundant_payload_len = 0;
137 gpointer red_hdr = payload + red_hdrs_offset;
139 gboolean is_redundant;
143 /* Can we address the first byte where F bit is located ? */
144 if (red_hdrs_offset + 1 > payload_len)
145 goto red_buffer_invalid;
147 is_redundant = rtp_red_block_is_redundant (red_hdr);
149 /* Is it the last block? */
151 red_hdr_len = rtp_red_block_header_get_length (TRUE);
153 /* Can we address all the other bytes in RED block header? */
154 if (red_hdrs_offset + red_hdr_len > payload_len)
155 goto red_buffer_invalid;
157 redundant_payload_len += rtp_red_block_get_payload_length (red_hdr);
158 red_hdrs_offset += red_hdr_len;
160 red_hdr_len = rtp_red_block_header_get_length (FALSE);
161 red_hdrs_offset += red_hdr_len;
166 /* Do we have enough data to create redundant packets & main packet. Keep in
167 * mind that redundant_payload_len contains the length of redundant packets only.
169 if (red_hdrs_offset + redundant_payload_len >= payload_len)
170 goto red_buffer_invalid;
172 *dst_first_red_payload_offset = red_hdrs_offset;
174 GST_LOG_OBJECT (self, "RED packet has %u blocks", red_hdrs_checked);
178 GST_WARNING_OBJECT (self, "Received invalid RED packet "
179 "ssrc=0x%08x pt=%u tstamp=%u seq=%u size=%u, "
181 gst_rtp_buffer_get_ssrc (red_rtp),
182 gst_rtp_buffer_get_payload_type (red_rtp),
183 gst_rtp_buffer_get_timestamp (red_rtp),
184 gst_rtp_buffer_get_seq (red_rtp),
185 gst_rtp_buffer_get_packet_len (red_rtp), red_hdrs_checked);
190 gst_red_history_lost_seq_num_for_timestamp (GstRtpRedDec * self,
191 guint32 timestamp, guint16 * dst_seq_num)
193 GList *older_sibling = g_list_find_custom (self->rtp_history->head,
194 GUINT_TO_POINTER (timestamp),
195 gst_rtp_red_history_find_less);
198 guint32 timestamp_diff;
201 if (NULL == older_sibling) {
202 if (self->rtp_history->length == RTP_HISTORY_MAX_SIZE)
203 GST_WARNING_OBJECT (self, "History is too short. "
204 "Oldest rtp timestamp %u, looking for %u, size %u",
205 RTP_HIST_ITEM_TIMESTAMP (self->rtp_history->tail->data),
206 timestamp, self->rtp_history->length);
210 if (NULL == older_sibling->prev) {
211 GST_WARNING_OBJECT (self, "RED block timestamp offset probably wrong. "
212 "Latest rtp timestamp %u, looking for %u, size %u",
213 RTP_HIST_ITEM_TIMESTAMP (self->rtp_history->head->data),
214 timestamp, self->rtp_history->length);
218 older = older_sibling->data;
219 newer = older_sibling->prev->data;
220 /* We know for sure @older has lower timestamp than we are looking for,
221 * if @newer has the same timestamp, there is no packet loss and we
222 * don't need to use redundant data */
223 if (newer->timestamp == timestamp)
226 seq_diff = gst_rtp_buffer_compare_seqnum (older->seq, newer->seq);
229 GST_WARNING_OBJECT (self, "RED block timestamp offset is wrong: "
230 "#%u,%u #%u,%u looking for %u",
231 older->seq, older->timestamp,
232 newer->seq, newer->timestamp, timestamp);
234 GST_WARNING_OBJECT (self, "RTP timestamps increasing while "
235 "sequence numbers decreasing: #%u,%u #%u,%u",
236 older->seq, older->timestamp, newer->seq, newer->timestamp);
240 timestamp_diff = newer->timestamp - older->timestamp;
241 for (gint lost_packet_idx = 1; lost_packet_idx < seq_diff; ++lost_packet_idx) {
242 guint32 lost_timestamp = older->timestamp +
243 lost_packet_idx * timestamp_diff / seq_diff;
244 if (lost_timestamp == timestamp) {
245 *dst_seq_num = older->seq + lost_packet_idx;
250 GST_WARNING_OBJECT (self, "Can't find RED block timestamp "
251 "#%u,%u #%u,%u looking for %u",
252 older->seq, older->timestamp, newer->seq, newer->timestamp, timestamp);
257 gst_rtp_red_create_packet (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
258 gboolean marker, guint8 pt, guint16 seq_num, guint32 timestamp,
259 gsize red_payload_subbuffer_start, gsize red_payload_subbuffer_len)
261 guint csrc_count = gst_rtp_buffer_get_csrc_count (red_rtp);
262 GstBuffer *ret = gst_rtp_buffer_new_allocate (0, 0, csrc_count);
263 GstRTPBuffer ret_rtp = GST_RTP_BUFFER_INIT;
264 if (!gst_rtp_buffer_map (ret, GST_MAP_WRITE, &ret_rtp))
265 g_assert_not_reached ();
267 gst_rtp_buffer_set_marker (&ret_rtp, marker);
268 gst_rtp_buffer_set_payload_type (&ret_rtp, pt);
269 gst_rtp_buffer_set_seq (&ret_rtp, seq_num);
270 gst_rtp_buffer_set_timestamp (&ret_rtp, timestamp);
271 gst_rtp_buffer_set_ssrc (&ret_rtp, gst_rtp_buffer_get_ssrc (red_rtp));
272 for (guint i = 0; i < csrc_count; ++i)
273 gst_rtp_buffer_set_csrc (&ret_rtp, i, gst_rtp_buffer_get_csrc (red_rtp, i));
274 gst_rtp_buffer_unmap (&ret_rtp);
276 ret = gst_buffer_append (ret,
277 gst_rtp_buffer_get_payload_subbuffer (red_rtp,
278 red_payload_subbuffer_start, red_payload_subbuffer_len));
280 /* Timestamps, meta, flags from the RED packet should go to main block packet */
281 gst_buffer_copy_into (ret, red_rtp->buffer, GST_BUFFER_COPY_METADATA, 0, -1);
286 gst_rtp_red_create_from_redundant_block (GstRtpRedDec * self,
287 GstRTPBuffer * red_rtp, gsize * red_hdr_offset, gsize * red_payload_offset)
289 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
290 guint8 *red_hdr = payload + *red_hdr_offset;
291 guint32 lost_timestamp = gst_rtp_buffer_get_timestamp (red_rtp) -
292 rtp_red_block_get_timestamp_offset (red_hdr);
294 GstBuffer *ret = NULL;
295 guint16 lost_seq = 0;
296 if (gst_red_history_lost_seq_num_for_timestamp (self, lost_timestamp,
299 gst_rtp_red_create_packet (self, red_rtp, FALSE,
300 rtp_red_block_get_payload_type (red_hdr), lost_seq, lost_timestamp,
301 *red_payload_offset, rtp_red_block_get_payload_length (red_hdr));
302 GST_BUFFER_FLAG_SET (ret, GST_RTP_BUFFER_FLAG_REDUNDANT);
305 *red_hdr_offset += rtp_red_block_header_get_length (TRUE);
306 *red_payload_offset += rtp_red_block_get_payload_length (red_hdr);
311 gst_rtp_red_create_from_main_block (GstRtpRedDec * self,
312 GstRTPBuffer * red_rtp, gsize red_hdr_offset, gsize * red_payload_offset)
314 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
315 GstBuffer *ret = gst_rtp_red_create_packet (self, red_rtp,
316 gst_rtp_buffer_get_marker (red_rtp),
317 rtp_red_block_get_payload_type (payload + red_hdr_offset),
318 gst_rtp_buffer_get_seq (red_rtp),
319 gst_rtp_buffer_get_timestamp (red_rtp),
320 *red_payload_offset, -1);
321 *red_payload_offset = gst_rtp_buffer_get_payload_len (red_rtp);
326 gst_rtp_red_create_from_block (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
327 gsize * red_hdr_offset, gsize * red_payload_offset)
329 guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
331 if (rtp_red_block_is_redundant (payload + (*red_hdr_offset)))
332 return gst_rtp_red_create_from_redundant_block (self, red_rtp,
333 red_hdr_offset, red_payload_offset);
335 return gst_rtp_red_create_from_main_block (self, red_rtp, *red_hdr_offset,
340 gst_rtp_red_process (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
341 gsize first_red_payload_offset)
343 gsize red_hdr_offset = 0;
344 gsize red_payload_offset = first_red_payload_offset;
345 gsize payload_len = gst_rtp_buffer_get_payload_len (red_rtp);
346 GstFlowReturn ret = GST_FLOW_OK;
350 gst_rtp_red_create_from_block (self, red_rtp, &red_hdr_offset,
351 &red_payload_offset);
353 ret = gst_pad_push (self->srcpad, buf);
354 } while (GST_FLOW_OK == ret && red_payload_offset < payload_len);
360 gst_rtp_red_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
362 GstRtpRedDec *self = GST_RTP_RED_DEC (parent);
363 GstRTPBuffer irtp = GST_RTP_BUFFER_INIT;
364 GstFlowReturn ret = GST_FLOW_OK;
365 gsize first_red_payload_offset = 0;
367 if (self->pt == UNDEF_PT)
368 return gst_pad_push (self->srcpad, buffer);
370 if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &irtp))
371 return gst_pad_push (self->srcpad, buffer);
373 gst_rtp_red_history_update (self, &irtp);
375 if (self->pt != gst_rtp_buffer_get_payload_type (&irtp)) {
376 GST_LOG_RTP_PACKET (self, "rtp header (incoming)", &irtp);
378 gst_rtp_buffer_unmap (&irtp);
379 return gst_pad_push (self->srcpad, buffer);
382 self->num_received++;
384 if (rtp_red_buffer_is_valid (self, &irtp, &first_red_payload_offset)) {
385 GST_DEBUG_RTP_PACKET (self, "rtp header (red)", &irtp);
386 ret = gst_rtp_red_process (self, &irtp, first_red_payload_offset);
389 gst_rtp_buffer_unmap (&irtp);
390 gst_buffer_unref (buffer);
395 gst_rtp_red_dec_dispose (GObject * obj)
397 GstRtpRedDec *self = GST_RTP_RED_DEC (obj);
399 g_queue_free_full (self->rtp_history, rtp_hist_item_free);
401 G_OBJECT_CLASS (gst_rtp_red_dec_parent_class)->dispose (obj);
405 gst_rtp_red_dec_init (GstRtpRedDec * self)
407 GstPadTemplate *pad_template;
410 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (self), "src");
411 self->srcpad = gst_pad_new_from_template (pad_template, "src");
412 gst_element_add_pad (GST_ELEMENT_CAST (self), self->srcpad);
415 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (self), "sink");
416 self->sinkpad = gst_pad_new_from_template (pad_template, "sink");
417 gst_pad_set_chain_function (self->sinkpad,
418 GST_DEBUG_FUNCPTR (gst_rtp_red_dec_chain));
419 GST_PAD_SET_PROXY_CAPS (self->sinkpad);
420 GST_PAD_SET_PROXY_ALLOCATION (self->sinkpad);
421 gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
423 self->pt = DEFAULT_PT;
424 self->num_received = 0;
425 self->rtp_history = g_queue_new ();
430 gst_rtp_red_dec_set_property (GObject * object, guint prop_id,
431 const GValue * value, GParamSpec * pspec)
433 GstRtpRedDec *self = GST_RTP_RED_DEC (object);
437 self->pt = g_value_get_int (value);
440 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
446 gst_rtp_red_dec_get_property (GObject * object, guint prop_id,
447 GValue * value, GParamSpec * pspec)
449 GstRtpRedDec *self = GST_RTP_RED_DEC (object);
452 g_value_set_int (value, self->pt);
455 g_value_set_uint (value, self->num_received);
458 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
464 gst_rtp_red_dec_class_init (GstRtpRedDecClass * klass)
466 GObjectClass *gobject_class;
467 GstElementClass *element_class;
469 gobject_class = G_OBJECT_CLASS (klass);
470 element_class = GST_ELEMENT_CLASS (klass);
472 gst_element_class_add_pad_template (element_class,
473 gst_static_pad_template_get (&src_template));
474 gst_element_class_add_pad_template (element_class,
475 gst_static_pad_template_get (&sink_template));
477 gst_element_class_set_metadata (element_class,
478 "Redundant Audio Data (RED) Decoder",
479 "Codec/Depayloader/Network/RTP",
480 "Decode Redundant Audio Data (RED)",
481 "Hani Mustafa <hani@pexip.com>, Mikhail Fludkov <misha@pexip.com>");
483 gobject_class->set_property =
484 GST_DEBUG_FUNCPTR (gst_rtp_red_dec_set_property);
485 gobject_class->get_property =
486 GST_DEBUG_FUNCPTR (gst_rtp_red_dec_get_property);
487 gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_red_dec_dispose);
489 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PT,
490 g_param_spec_int ("pt", "payload type",
491 "Payload type FEC packets",
492 MIN_PT, MAX_PT, DEFAULT_PT,
493 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
495 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_RECEIVED,
496 g_param_spec_uint ("received", "Received",
497 "Count of received packets",
498 0, G_MAXUINT32, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
500 GST_DEBUG_CATEGORY_INIT (gst_rtp_red_dec_debug, "rtpreddec", 0,